diff --git a/crate/sync/Cargo.toml b/crate/sync/Cargo.toml deleted file mode 100644 index dbd421e0..00000000 --- a/crate/sync/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "sync-test" -version = "0.1.0" -authors = ["WangRunji "] - -[dependencies] diff --git a/crate/sync/src/main.rs b/crate/sync/src/main.rs deleted file mode 100644 index 725a6330..00000000 --- a/crate/sync/src/main.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! entrance to test the communication in processes with solving five philosophers problem - -mod monitor; -mod mutex; - -fn main() { - // mutex::main(); - monitor::main(); -} diff --git a/crate/sync/src/monitor.rs b/crate/sync/src/monitor.rs deleted file mode 100644 index 09d42094..00000000 --- a/crate/sync/src/monitor.rs +++ /dev/null @@ -1,94 +0,0 @@ -//! solve the five philosophers problem with monitor - -use std::sync::{Arc, Condvar, Mutex}; -use std::thread; -use std::time::Duration; - -struct Philosopher { - name: String, - left: usize, - right: usize, -} - -impl Philosopher { - fn new(name: &str, left: usize, right: usize) -> Philosopher { - Philosopher { - name: name.to_string(), - left, - right, - } - } - - fn eat(&self, table: &Table) { - { - let mut fork_status = table.fork_status.lock().unwrap(); - if fork_status[self.left] { - fork_status = table.fork_condvar[self.left].wait(fork_status).unwrap() - } - fork_status[self.left] = true; - if fork_status[self.right] { - fork_status = table.fork_condvar[self.right].wait(fork_status).unwrap() - } - fork_status[self.right] = true; - } - println!("{} is eating.", self.name); - thread::sleep(Duration::from_secs(1)); - { - let mut fork_status = table.fork_status.lock().unwrap(); - fork_status[self.left] = false; - fork_status[self.right] = false; - table.fork_condvar[self.left].notify_one(); - table.fork_condvar[self.right].notify_one(); - } - } - - fn think(&self) { - println!("{} is thinking.", self.name); - thread::sleep(Duration::from_secs(1)); - } -} - -struct Table { - fork_status: Mutex>, - fork_condvar: Vec, -} - -// the main function to test -pub fn main() { - let table = Arc::new(Table { - fork_status: Mutex::new(vec![false; 5]), - fork_condvar: vec![ - Condvar::new(), - Condvar::new(), - Condvar::new(), - Condvar::new(), - Condvar::new(), - ], - }); - - let philosophers = vec![ - Philosopher::new("1", 0, 1), - Philosopher::new("2", 1, 2), - Philosopher::new("3", 2, 3), - Philosopher::new("4", 3, 4), - Philosopher::new("5", 0, 4), - ]; - - let handles: Vec<_> = philosophers - .into_iter() - .map(|p| { - let table = table.clone(); - - thread::spawn(move || { - for _ in 0..5 { - p.think(); - p.eat(&table); - } - }) - }) - .collect(); - - for h in handles { - h.join().unwrap(); - } -} diff --git a/crate/sync/src/mutex.rs b/crate/sync/src/mutex.rs deleted file mode 100644 index b20efe79..00000000 --- a/crate/sync/src/mutex.rs +++ /dev/null @@ -1,77 +0,0 @@ -//! solve the five philosophers problem with mutex - -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - -struct Philosopher { - name: String, - left: usize, - right: usize, -} - -impl Philosopher { - fn new(name: &str, left: usize, right: usize) -> Philosopher { - Philosopher { - name: name.to_string(), - left, - right, - } - } - - fn eat(&self, table: &Table) { - let _left = table.forks[self.left].lock().unwrap(); - let _right = table.forks[self.right].lock().unwrap(); - - println!("{} is eating.", self.name); - thread::sleep(Duration::from_secs(1)); - } - - fn think(&self) { - println!("{} is thinking.", self.name); - thread::sleep(Duration::from_secs(1)); - } -} - -struct Table { - forks: Vec>, -} - -// the main function to test -pub fn main() { - let table = Arc::new(Table { - forks: vec![ - Mutex::new(()), - Mutex::new(()), - Mutex::new(()), - Mutex::new(()), - Mutex::new(()), - ], - }); - - let philosophers = vec![ - Philosopher::new("1", 0, 1), - Philosopher::new("2", 1, 2), - Philosopher::new("3", 2, 3), - Philosopher::new("4", 3, 4), - Philosopher::new("5", 0, 4), - ]; - - let handles: Vec<_> = philosophers - .into_iter() - .map(|p| { - let table = table.clone(); - - thread::spawn(move || { - for _ in 0..5 { - p.think(); - p.eat(&table); - } - }) - }) - .collect(); - - for h in handles { - h.join().unwrap(); - } -} diff --git a/kernel/src/fs/pipe.rs b/kernel/src/fs/pipe.rs index 48b4c0c6..809c7cd3 100644 --- a/kernel/src/fs/pipe.rs +++ b/kernel/src/fs/pipe.rs @@ -1,6 +1,5 @@ //! Implement INode for Pipe -use crate::sync::Condvar; use crate::sync::{Event, EventBus, SpinNoIrqLock as Mutex}; use crate::syscall::SysError::EAGAIN; use alloc::boxed::Box; diff --git a/kernel/src/process/proc.rs b/kernel/src/process/proc.rs index f9e98aa9..dc1bfce4 100644 --- a/kernel/src/process/proc.rs +++ b/kernel/src/process/proc.rs @@ -9,7 +9,7 @@ use crate::memory::{ phys_to_virt, ByFrame, Delay, File, GlobalFrameAlloc, KernelStack, MemoryAttr, MemorySet, Read, }; use crate::process::thread::THREADS; -use crate::sync::{Condvar, Event, EventBus, SpinLock, SpinNoIrqLock as Mutex}; +use crate::sync::{Event, EventBus, SpinLock, SpinNoIrqLock as Mutex}; use crate::{ signal::{Siginfo, Signal, SignalAction, SignalStack, Sigset}, syscall::handle_syscall, diff --git a/kernel/src/process/structs.rs b/kernel/src/process/structs.rs index 52c0544f..a44f7526 100644 --- a/kernel/src/process/structs.rs +++ b/kernel/src/process/structs.rs @@ -5,7 +5,7 @@ use crate::ipc::SemProc; use crate::memory::{ phys_to_virt, ByFrame, Delay, File, GlobalFrameAlloc, KernelStack, MemoryAttr, MemorySet, Read, }; -use crate::sync::{Condvar, SpinLock, SpinNoIrqLock as Mutex}; +use crate::sync::{SpinLock, SpinNoIrqLock as Mutex}; use crate::{ signal::{Siginfo, Signal, SignalAction, SignalStack, Sigset}, syscall::handle_syscall, diff --git a/kernel/src/process/thread.rs b/kernel/src/process/thread.rs index ac692b75..cb970339 100644 --- a/kernel/src/process/thread.rs +++ b/kernel/src/process/thread.rs @@ -17,7 +17,7 @@ use crate::memory::{ phys_to_virt, ByFrame, Delay, File, GlobalFrameAlloc, KernelStack, MemoryAttr, MemorySet, Read, }; use crate::process::structs::ElfExt; -use crate::sync::{Condvar, EventBus, SpinLock, SpinNoIrqLock as Mutex}; +use crate::sync::{EventBus, SpinLock, SpinNoIrqLock as Mutex}; use crate::{ signal::{handle_signal, Siginfo, Signal, SignalAction, SignalStack, Sigset}, syscall::handle_syscall, diff --git a/kernel/src/sync/event_bus.rs b/kernel/src/sync/event_bus.rs index 09b20abd..263ebdac 100644 --- a/kernel/src/sync/event_bus.rs +++ b/kernel/src/sync/event_bus.rs @@ -21,6 +21,10 @@ bitflags! { const PROCESS_QUIT = 1 << 10; const CHILD_PROCESS_QUIT = 1 << 11; const RECEIVE_SIGNAL = 1 << 12; + + /// Semaphore + const SEMAPHORE_REMOVED = 1 << 20; + const SEMAPHORE_CAN_ACQUIRE = 1 << 21; } } @@ -59,6 +63,10 @@ impl EventBus { pub fn subscribe(&mut self, callback: EventHandler) { self.callbacks.push(callback); } + + pub fn get_callback_len(&self) -> usize { + self.callbacks.len() + } } pub fn wait_for_event(bus: Arc>, mask: Event) -> impl Future { diff --git a/kernel/src/sync/semaphore.rs b/kernel/src/sync/semaphore.rs index a8e46f0f..b63fc7e0 100644 --- a/kernel/src/sync/semaphore.rs +++ b/kernel/src/sync/semaphore.rs @@ -2,23 +2,27 @@ //! //! Same as [std::sync::Semaphore at rust 1.7.0](https://docs.rs/std-semaphore/0.1.0/std_semaphore/) -use super::Condvar; -use super::SpinNoIrqLock as Mutex; +use super::{Event, EventBus, SpinNoIrqLock as Mutex}; use crate::syscall::SysError; +use alloc::boxed::Box; +use alloc::sync::Arc; use core::cell::Cell; +use core::future::Future; use core::ops::Deref; +use core::pin::Pin; +use core::task::{Context, Poll}; /// A counting, blocking, semaphore. pub struct Semaphore { // value and removed - lock: Mutex, - cvar: Condvar, + lock: Arc>, } struct SemaphoreInner { count: isize, pid: usize, removed: bool, + eventbus: EventBus, } /// An RAII guard which will release a resource acquired from a semaphore when @@ -35,18 +39,19 @@ impl Semaphore { /// available. It is valid to initialize a semaphore with a negative count. pub fn new(count: isize) -> Semaphore { Semaphore { - lock: Mutex::new(SemaphoreInner { + lock: Arc::new(Mutex::new(SemaphoreInner { count, removed: false, pid: 0, - }), - cvar: Condvar::new(), + eventbus: EventBus::default(), + })), } } pub fn remove(&self) { - self.lock.lock().removed = false; - self.cvar.notify_all(); + let mut inner = self.lock.lock(); + inner.removed = true; + inner.eventbus.set(Event::SEMAPHORE_REMOVED); } /// Acquires a resource of this semaphore, blocking the current thread until @@ -54,17 +59,43 @@ impl Semaphore { /// /// This method will block until the internal count of the semaphore is at /// least 1. - pub fn acquire(&self) -> Result<(), SysError> { - let mut inner = self.lock.lock(); - while !inner.removed && inner.count <= 0 { - inner = self.cvar.wait(inner); + pub async fn acquire(&self) -> Result<(), SysError> { + #[must_use = "future does nothing unless polled/`await`-ed"] + struct SemaphoreFuture { + inner: Arc>, } - if inner.removed { - Err(SysError::EIDRM) - } else { - inner.count -= 1; - Ok(()) + + impl<'a> Future for SemaphoreFuture { + type Output = Result<(), SysError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut inner = self.inner.lock(); + if inner.removed { + return Poll::Ready(Err(SysError::EIDRM)); + } else if inner.count >= 1 { + inner.count -= 1; + if inner.count < 1 { + inner.eventbus.clear(Event::SEMAPHORE_CAN_ACQUIRE); + } + return Poll::Ready(Ok(())); + } + + let waker = cx.waker().clone(); + inner.eventbus.subscribe(Box::new({ + move |_| { + waker.wake_by_ref(); + true + } + })); + + return Poll::Pending; + } } + + let future = SemaphoreFuture { + inner: self.lock.clone(), + }; + future.await } /// Release a resource from this semaphore. @@ -72,8 +103,11 @@ impl Semaphore { /// This will increment the number of resources in this semaphore by 1 and /// will notify any pending waiters in `acquire` or `access` if necessary. pub fn release(&self) { - self.lock.lock().count += 1; - self.cvar.notify_one(); + let mut inner = self.lock.lock(); + inner.count += 1; + if inner.count >= 1 { + inner.eventbus.set(Event::SEMAPHORE_CAN_ACQUIRE); + } } /// Acquires a resource of this semaphore, returning an RAII guard to @@ -81,8 +115,8 @@ impl Semaphore { /// /// This function is semantically equivalent to an `acquire` followed by a /// `release` when the guard returned is dropped. - pub fn access(&self) -> Result { - self.acquire()?; + pub async fn access(&self) -> Result, SysError> { + self.acquire().await?; Ok(SemaphoreGuard { sem: self }) } @@ -90,44 +124,26 @@ impl Semaphore { pub fn get(&self) -> isize { self.lock.lock().count } + pub fn get_ncnt(&self) -> usize { - self.cvar.wait_queue_len() + self.lock.lock().eventbus.get_callback_len() } pub fn get_pid(&self) -> usize { self.lock.lock().pid } + pub fn set_pid(&self, pid: usize) { self.lock.lock().pid = pid; } /// Set the current count pub fn set(&self, value: isize) { - self.lock.lock().count = value; - } - - /// Modify by k atomically. when wait is false avoid waiting. unused - pub fn modify(&self, k: isize, wait: bool) -> Result { - if k > 0 { - self.lock.lock().count += k; - self.cvar.notify_one(); - } else if k <= 0 { - let mut inner = self.lock.lock(); - let mut temp_k = k; - while inner.count + temp_k < 0 { - if wait == false { - return Err(()); - } - temp_k += inner.count; - inner.count = 0; - inner = self.cvar.wait(inner); - } - inner.count += temp_k; - if inner.count > 0 { - self.cvar.notify_one(); - } + let mut inner = self.lock.lock(); + inner.count = value; + if inner.count >= 1 { + inner.eventbus.set(Event::SEMAPHORE_CAN_ACQUIRE); } - Ok(0) } } diff --git a/kernel/src/syscall/ipc.rs b/kernel/src/syscall/ipc.rs index 2a480d98..22eb0b58 100644 --- a/kernel/src/syscall/ipc.rs +++ b/kernel/src/syscall/ipc.rs @@ -27,9 +27,9 @@ impl Syscall<'_> { Ok(id) } - pub fn sys_semop(&self, id: usize, ops: *const SemBuf, num_ops: usize) -> SysResult { + pub async fn sys_semop(&self, id: usize, ops: UserInPtr, num_ops: usize) -> SysResult { info!("semop: id: {}", id); - let ops = unsafe { self.vm().check_read_array(ops, num_ops)? }; + let ops = ops.read_array(num_ops)?; let sem_array = self.process().semaphores.get(id).ok_or(SysError::EINVAL)?; sem_array.otime(); @@ -42,7 +42,7 @@ impl Syscall<'_> { let _result = match op { 1 => sem.release(), - -1 => sem.acquire()?, + -1 => sem.acquire().await?, _ => unimplemented!("Semaphore: semop.(Not 1/-1)"), }; sem.set_pid(self.process().pid.get()); diff --git a/kernel/src/syscall/mod.rs b/kernel/src/syscall/mod.rs index 88e488c2..e9b4e979 100644 --- a/kernel/src/syscall/mod.rs +++ b/kernel/src/syscall/mod.rs @@ -349,7 +349,10 @@ impl Syscall<'_> { #[cfg(not(target_arch = "mips"))] SYS_SEMGET => self.sys_semget(args[0], args[1], args[2]), #[cfg(not(target_arch = "mips"))] - SYS_SEMOP => self.sys_semop(args[0], args[1] as *const SemBuf, args[2]), + SYS_SEMOP => { + self.sys_semop(args[0], UserInPtr::from(args[1]), args[2]) + .await + } #[cfg(not(target_arch = "mips"))] SYS_SEMCTL => self.sys_semctl(args[0], args[1], args[2], args[3]), SYS_MSGGET => self.unimplemented("msgget", Ok(0)), @@ -508,7 +511,7 @@ impl Syscall<'_> { Ok(0) } SYS_IPC => match args[0] { - 1 => self.sys_semop(args[1], args[2] as *const SemBuf, args[3]), + 1 => self.sys_semop(args[1], UserInPtr::from(args[2]), args[3]), 2 => self.sys_semget(args[1], args[2] as isize, args[3]), 3 => self.sys_semctl(args[1], args[2], args[3], args[4]), _ => return None,