From: Suzuki I found some races in the AIO call back path which is the root cause for a few problems in the AIO code. The race is between the aio call back path ( executed by softirqs ) and the aio_run_iocb(). (1) Suppose a retry returned -EIOCBRETRY and wait has been queued. Now, whenever the wait event has been completed, the aio_wake_function() is invoked by the softirqs. aio_wake_function deletes the wait entry (iocb->ki_wait.task_list) and invokes kick_iocb() to kick the iocb and queue it back to the run_list. Consider a situation like: SOFTIRQ: list_del_init(&wait->task_list); ## SOFTIRQ gets scheduled or PROCESS executes on another CPU ## kick_iocb(iocb); PROCESS: /* * Issue an additional retry to avoid waiting forever if * no waits were queued (e.g. in case of a short read). */ if (list_empty(&iocb->ki_wait.task_list)) kiocbSetKicked(iocb); So, PROCESS might see the deleted(empty) wait list (iocb->ki_wait.task_list) and will kick and queue it up and it will be retried again. Now if the next retry is going to happen before SOFTIRQ gets a chance to run, we are running into problems. There are two possiblities: (a) The iocb may get completed. If this happens before the SOFTIRQ enters kick_iocb(iocb), we are about to kick an iocb which is no more valid. This might cause a Kernel Oops while trying, spin_lock_irqsave(&ctx->ctx_lock, flags); since the iocb->ki_ctx may be invalid. (b) The iocb might encounter another wait condition. This case iocb would be queued for some events. This would cause the SOFTIRQ hit WARN_ON((!list_empty(&iocb->ki_wait.task_list))); in queue_kicked_iocb(). (2) Similar races occur due to Kicking the iocb. i.e, SOFTIRQ: [in kick_iocb()] if(!kiocbTryKick(iocb)) { # If we get scheduled here. And PROCESS reaches as below # queue_kicked_iocb(iocb); PROCESS: [ in aio_run_iocb() ] /* will make __queue_kicked_iocb succeed from here on */ INIT_LIST_HEAD(&iocb->ki_run_list); /* we must queue the next iteration ourselves, if it * has already been kicked */ if (kiocbIsKicked(iocb)) { <- Will be true see ( kicked by kick_iocb) __queue_kicked_iocb(iocb); } Thus iocb will get queued and may be retried. This again can cause the problems described above. Conclusion =========== From this I conclude that the following operations in call back path for async iocbs should be atomic. (*) Deletion of the wait queue entry. (*) Kicking the iocb. (*) Queue back to run_list. ( which is already atomic with ctx_lock held ). Also the check for waits being queued in aio_run_iocb() should be done with the ctx_lock held. Solution ======== I have created a patch which makes above operations to be done with the iocb->ki_ctx->ctx_lock held. Also, I have moved the check : > if (list_empty(&iocb->ki_wait.task_list)) > kiocbSetKicked(iocb); to be done under the ctx_lock held. Since we already holds a lock before calling queue_kicked_iocb(), there is no need for a queue_kicked_iocb anymore. So I have renamed the __queue_kicked_iocb to queue_kicked_iocb. Testing ======== I tested the proposed patch in 2.6.13-rc3-mm1 on a 2-way Intel Xeon(with HT) with aio-stress and fsx-linux test cases. The tests ran fine. Cc: Suparna Bhattacharya Cc: Benjamin LaHaise Signed-off-by: Andrew Morton --- fs/aio.c | 54 ++++++++++++++++++++++++++++++------------------------ 1 files changed, 30 insertions(+), 24 deletions(-) diff -puN fs/aio.c~aio-fix-races-in-callback-path fs/aio.c --- devel/fs/aio.c~aio-fix-races-in-callback-path 2005-07-27 12:43:19.000000000 -0700 +++ devel-akpm/fs/aio.c 2005-07-27 12:43:19.000000000 -0700 @@ -608,7 +608,7 @@ static void unuse_mm(struct mm_struct *m * Should be called with the spin lock iocb->ki_ctx->ctx_lock * held */ -static inline int __queue_kicked_iocb(struct kiocb *iocb) +static inline int queue_kicked_iocb(struct kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx; @@ -723,13 +723,6 @@ static ssize_t aio_run_iocb(struct kiocb aio_complete(iocb, ret, 0); /* must not access the iocb after this */ } - } else { - /* - * Issue an additional retry to avoid waiting forever if - * no waits were queued (e.g. in case of a short read). - */ - if (list_empty(&iocb->ki_wait.task_list)) - kiocbSetKicked(iocb); } out: spin_lock_irq(&ctx->ctx_lock); @@ -740,17 +733,23 @@ out: * and know that there is more left to go, * this is where we let go so that a subsequent * "kick" can start the next iteration + * + * Issue an additional retry to avoid waiting forever if + * no waits were queued (e.g. in case of a short read). + * (Should be done with ctx_lock held.) */ - /* will make __queue_kicked_iocb succeed from here on */ + if (list_empty(&iocb->ki_wait.task_list)) + kiocbSetKicked(iocb); + /* will make queue_kicked_iocb succeed from here on */ INIT_LIST_HEAD(&iocb->ki_run_list); /* we must queue the next iteration ourselves, if it * has already been kicked */ if (kiocbIsKicked(iocb)) { - __queue_kicked_iocb(iocb); + queue_kicked_iocb(iocb); /* - * __queue_kicked_iocb will always return 1 here, because + * queue_kicked_iocb will always return 1 here, because * iocb->ki_run_list is empty at this point so it should * be safe to unconditionally queue the context into the * work queue. @@ -869,21 +868,31 @@ static void aio_kick_handler(void *data) /* - * Called by kick_iocb to queue the kiocb for retry - * and if required activate the aio work queue to process - * it + * Kicking an async iocb. + * The following operations for the async iocbs should be atomic + * to avoid races with the aio_run_iocb() code. + * (1) Deleting the wait queue entry. + * (2) Kicking the iocb. + * (3) Queue the iocb back to run_list. + * Holds the ctx->ctx_lock to avoid races. */ -static void queue_kicked_iocb(struct kiocb *iocb) +static void kick_async_iocb(struct kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx; unsigned long flags; int run = 0; - WARN_ON((!list_empty(&iocb->ki_wait.task_list))); - spin_lock_irqsave(&ctx->ctx_lock, flags); - run = __queue_kicked_iocb(iocb); + list_del_init(&iocb->ki_wait.task_list); + /* If its already kicked we shouldn't queue it again */ + if (!kiocbTryKick(iocb)) { + run = queue_kicked_iocb(iocb); + } spin_unlock_irqrestore(&ctx->ctx_lock, flags); + + /* Activate the aio worker queue if we have successfully queued + * the iocb, so that it can be processed + */ if (run) aio_queue_work(ctx); } @@ -900,15 +909,13 @@ void fastcall kick_iocb(struct kiocb *io /* sync iocbs are easy: they can only ever be executing from a * single context. */ if (is_sync_kiocb(iocb)) { + list_del_init(&iocb->ki_wait.task_list); kiocbSetKicked(iocb); wake_up_process(iocb->ki_obj.tsk); return; - } + } else + kick_async_iocb(iocb); - /* If its already kicked we shouldn't queue it again */ - if (!kiocbTryKick(iocb)) { - queue_kicked_iocb(iocb); - } } EXPORT_SYMBOL(kick_iocb); @@ -1460,7 +1467,6 @@ static int aio_wake_function(wait_queue_ { struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait); - list_del_init(&wait->task_list); kick_iocb(iocb); return 1; } _