From 5753a09366b3f4c13c3401c8f0f24661855f7122 Mon Sep 17 00:00:00 2001 From: Yifan Wu Date: Sun, 10 Oct 2021 17:20:53 -0700 Subject: [PATCH] Implement mpsc using semaphores. --- os/src/sync/mod.rs | 2 ++ os/src/sync/semaphore.rs | 45 ++++++++++++++++++++++++++ os/src/syscall/mod.rs | 6 ++++ os/src/syscall/sync.rs | 38 +++++++++++++++++++++- os/src/task/process.rs | 5 ++- user/src/bin/mpsc_sem.rs | 69 ++++++++++++++++++++++++++++++++++++++++ user/src/lib.rs | 9 ++++++ user/src/syscall.rs | 15 +++++++++ 8 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 os/src/sync/semaphore.rs create mode 100644 user/src/bin/mpsc_sem.rs diff --git a/os/src/sync/mod.rs b/os/src/sync/mod.rs index 605ce165..ed39a630 100644 --- a/os/src/sync/mod.rs +++ b/os/src/sync/mod.rs @@ -1,5 +1,7 @@ mod up; mod mutex; +mod semaphore; pub use up::UPSafeCell; pub use mutex::{Mutex, MutexSpin, MutexBlocking}; +pub use semaphore::Semaphore; diff --git a/os/src/sync/semaphore.rs b/os/src/sync/semaphore.rs new file mode 100644 index 00000000..2f66b87f --- /dev/null +++ b/os/src/sync/semaphore.rs @@ -0,0 +1,45 @@ +use alloc::{sync::Arc, collections::VecDeque}; +use crate::task::{add_task, TaskControlBlock, current_task, block_current_and_run_next}; +use crate::sync::UPSafeCell; + +pub struct Semaphore { + pub inner: UPSafeCell, +} + +pub struct SemaphoreInner { + pub count: isize, + pub wait_queue: VecDeque>, +} + +impl Semaphore { + pub fn new(res_count: usize) -> Self { + Self { + inner: unsafe { UPSafeCell::new( + SemaphoreInner { + count: res_count as isize, + wait_queue: VecDeque::new(), + } + )}, + } + } + + pub fn up(&self) { + let mut inner = self.inner.exclusive_access(); + inner.count += 1; + if inner.count <= 0 { + if let Some(task) = inner.wait_queue.pop_front() { + add_task(task); + } + } + } + + pub fn down(&self) { + let mut inner = self.inner.exclusive_access(); + inner.count -= 1; + if inner.count < 0 { + inner.wait_queue.push_back(current_task().unwrap()); + drop(inner); + block_current_and_run_next(); + } + } +} diff --git a/os/src/syscall/mod.rs b/os/src/syscall/mod.rs index 7c8ef58a..eac72aae 100644 --- a/os/src/syscall/mod.rs +++ b/os/src/syscall/mod.rs @@ -18,6 +18,9 @@ const SYSCALL_WAITTID: usize = 1002; const SYSCALL_MUTEX_CREATE: usize = 1010; const SYSCALL_MUTEX_LOCK: usize = 1011; const SYSCALL_MUTEX_UNLOCK: usize = 1012; +const SYSCALL_SEMAPHORE_CREATE: usize = 1020; +const SYSCALL_SEMAPHORE_UP: usize = 1021; +const SYSCALL_SEMAPHORE_DOWN: usize = 1022; mod fs; mod process; @@ -51,6 +54,9 @@ pub fn syscall(syscall_id: usize, args: [usize; 3]) -> isize { SYSCALL_MUTEX_CREATE => sys_mutex_create(args[0] == 1), SYSCALL_MUTEX_LOCK => sys_mutex_lock(args[0]), SYSCALL_MUTEX_UNLOCK => sys_mutex_unlock(args[0]), + SYSCALL_SEMAPHORE_CREATE => sys_semaphore_creare(args[0]), + SYSCALL_SEMAPHORE_UP => sys_semaphore_up(args[0]), + SYSCALL_SEMAPHORE_DOWN => sys_semaphore_down(args[0]), _ => panic!("Unsupported syscall_id: {}", syscall_id), } } diff --git a/os/src/syscall/sync.rs b/os/src/syscall/sync.rs index c4808c52..370af848 100644 --- a/os/src/syscall/sync.rs +++ b/os/src/syscall/sync.rs @@ -1,5 +1,5 @@ use crate::task::{current_task, current_process, block_current_and_run_next}; -use crate::sync::{MutexSpin, MutexBlocking}; +use crate::sync::{MutexSpin, MutexBlocking, Semaphore}; use crate::timer::{get_time_ms, add_timer}; use alloc::sync::Arc; @@ -51,3 +51,39 @@ pub fn sys_mutex_unlock(mutex_id: usize) -> isize { mutex.unlock(); 0 } + +pub fn sys_semaphore_creare(res_count: usize) -> isize { + let process = current_process(); + let mut process_inner = process.inner_exclusive_access(); + let id = if let Some(id) = process_inner + .semaphore_list + .iter() + .enumerate() + .find(|(_, item)| item.is_none()) + .map(|(id, _)| id) { + process_inner.semaphore_list[id] = Some(Arc::new(Semaphore::new(res_count))); + id + } else { + process_inner.semaphore_list.push(Some(Arc::new(Semaphore::new(res_count)))); + process_inner.semaphore_list.len() - 1 + }; + id as isize +} + +pub fn sys_semaphore_up(sem_id: usize) -> isize { + let process = current_process(); + let process_inner = process.inner_exclusive_access(); + let sem = Arc::clone(process_inner.semaphore_list[sem_id].as_ref().unwrap()); + drop(process_inner); + sem.up(); + 0 +} + +pub fn sys_semaphore_down(sem_id: usize) -> isize { + let process = current_process(); + let process_inner = process.inner_exclusive_access(); + let sem = Arc::clone(process_inner.semaphore_list[sem_id].as_ref().unwrap()); + drop(process_inner); + sem.down(); + 0 +} diff --git a/os/src/task/process.rs b/os/src/task/process.rs index d7c58112..af48f21a 100644 --- a/os/src/task/process.rs +++ b/os/src/task/process.rs @@ -4,7 +4,7 @@ use crate::mm::{ translated_refmut, }; use crate::trap::{TrapContext, trap_handler}; -use crate::sync::{UPSafeCell, Mutex}; +use crate::sync::{UPSafeCell, Mutex, Semaphore}; use core::cell::RefMut; use super::id::RecycleAllocator; use super::TaskControlBlock; @@ -33,6 +33,7 @@ pub struct ProcessControlBlockInner { pub tasks: Vec>>, pub task_res_allocator: RecycleAllocator, pub mutex_list: Vec>>, + pub semaphore_list: Vec>>, } impl ProcessControlBlockInner { @@ -97,6 +98,7 @@ impl ProcessControlBlock { tasks: Vec::new(), task_res_allocator: RecycleAllocator::new(), mutex_list: Vec::new(), + semaphore_list: Vec::new(), })} }); // create a main thread, we should allocate ustack and trap_cx here @@ -210,6 +212,7 @@ impl ProcessControlBlock { tasks: Vec::new(), task_res_allocator: RecycleAllocator::new(), mutex_list: Vec::new(), + semaphore_list: Vec::new(), })} }); // add child diff --git a/user/src/bin/mpsc_sem.rs b/user/src/bin/mpsc_sem.rs new file mode 100644 index 00000000..71141e41 --- /dev/null +++ b/user/src/bin/mpsc_sem.rs @@ -0,0 +1,69 @@ +#![no_std] +#![no_main] + +#[macro_use] +extern crate user_lib; + +extern crate alloc; + +use user_lib::{semaphore_create, semaphore_up, semaphore_down}; +use user_lib::{thread_create, waittid}; +use user_lib::exit; +use alloc::vec::Vec; + +const SEM_MUTEX: usize = 0; +const SEM_EMPTY: usize = 1; +const SEM_EXISTED: usize = 2; +const BUFFER_SIZE: usize = 8; +static mut BUFFER: [usize; BUFFER_SIZE] = [0; BUFFER_SIZE]; +static mut FRONT: usize = 0; +static mut TAIL: usize = 0; +const PRODUCER_COUNT: usize = 4; +const NUMBER_PER_PRODUCER: usize = 100; + +unsafe fn producer(id: *const usize) -> ! { + let id = *id; + for _ in 0..NUMBER_PER_PRODUCER { + semaphore_down(SEM_EMPTY); + semaphore_down(SEM_MUTEX); + BUFFER[FRONT] = id; + FRONT = (FRONT + 1) % BUFFER_SIZE; + semaphore_up(SEM_MUTEX); + semaphore_up(SEM_EXISTED); + } + exit(0) +} + +unsafe fn consumer() -> ! { + for _ in 0..PRODUCER_COUNT * NUMBER_PER_PRODUCER { + semaphore_down(SEM_EXISTED); + semaphore_down(SEM_MUTEX); + print!("{} ", BUFFER[TAIL]); + TAIL = (TAIL + 1) % BUFFER_SIZE; + semaphore_up(SEM_MUTEX); + semaphore_up(SEM_EMPTY); + } + println!(""); + exit(0) +} + +#[no_mangle] +pub fn main() -> i32 { + // create semaphores + assert_eq!(semaphore_create(1) as usize, SEM_MUTEX); + assert_eq!(semaphore_create(BUFFER_SIZE) as usize, SEM_EMPTY); + assert_eq!(semaphore_create(0) as usize, SEM_EXISTED); + // create threads + let ids: Vec<_> = (0..PRODUCER_COUNT).collect(); + let mut threads = Vec::new(); + for i in 0..PRODUCER_COUNT { + threads.push(thread_create(producer as usize, &ids.as_slice()[i] as *const _ as usize)); + } + threads.push(thread_create(consumer as usize, 0)); + // wait for all threads to complete + for thread in threads.iter() { + waittid(*thread as usize); + } + println!("mpsc_sem passed!"); + 0 +} diff --git a/user/src/lib.rs b/user/src/lib.rs index 6f034d6e..afde2ecc 100644 --- a/user/src/lib.rs +++ b/user/src/lib.rs @@ -119,4 +119,13 @@ pub fn mutex_create() -> isize { sys_mutex_create(false) } pub fn mutex_blocking_create() -> isize { sys_mutex_create(true) } pub fn mutex_lock(mutex_id: usize) { sys_mutex_lock(mutex_id); } pub fn mutex_unlock(mutex_id: usize) { sys_mutex_unlock(mutex_id); } +pub fn semaphore_create(res_count: usize) -> isize { + sys_semaphore_create(res_count) +} +pub fn semaphore_up(sem_id: usize) { + sys_semaphore_up(sem_id); +} +pub fn semaphore_down(sem_id: usize) { + sys_semaphore_down(sem_id); +} diff --git a/user/src/syscall.rs b/user/src/syscall.rs index 0edcb9ce..64f738d2 100644 --- a/user/src/syscall.rs +++ b/user/src/syscall.rs @@ -18,6 +18,9 @@ const SYSCALL_WAITTID: usize = 1002; const SYSCALL_MUTEX_CREATE: usize = 1010; const SYSCALL_MUTEX_LOCK: usize = 1011; const SYSCALL_MUTEX_UNLOCK: usize = 1012; +const SYSCALL_SEMAPHORE_CREATE: usize = 1020; +const SYSCALL_SEMAPHORE_UP: usize = 1021; +const SYSCALL_SEMAPHORE_DOWN: usize = 1022; fn syscall(id: usize, args: [usize; 3]) -> isize { let mut ret: isize; @@ -113,3 +116,15 @@ pub fn sys_mutex_lock(id: usize) -> isize { pub fn sys_mutex_unlock(id: usize) -> isize { syscall(SYSCALL_MUTEX_UNLOCK, [id, 0, 0]) } + +pub fn sys_semaphore_create(res_count: usize) -> isize { + syscall(SYSCALL_SEMAPHORE_CREATE, [res_count, 0, 0]) +} + +pub fn sys_semaphore_up(sem_id: usize) -> isize { + syscall(SYSCALL_SEMAPHORE_UP, [sem_id, 0, 0]) +} + +pub fn sys_semaphore_down(sem_id: usize) -> isize { + syscall(SYSCALL_SEMAPHORE_DOWN, [sem_id, 0, 0]) +}