Loading drivers/md/kcopyd.c +73 −59 Original line number Diff line number Diff line Loading @@ -26,14 +26,6 @@ #include "kcopyd.h" #include "dm.h" static struct workqueue_struct *_kcopyd_wq; static struct work_struct _kcopyd_work; static void wake(void) { queue_work(_kcopyd_wq, &_kcopyd_work); } /*----------------------------------------------------------------- * Each kcopyd client has its own little pool of preallocated * pages for kcopyd io. Loading @@ -50,8 +42,30 @@ struct dm_kcopyd_client { wait_queue_head_t destroyq; atomic_t nr_jobs; struct workqueue_struct *kcopyd_wq; struct work_struct kcopyd_work; /* * We maintain three lists of jobs: * * i) jobs waiting for pages * ii) jobs that have pages, and are waiting for the io to be issued. * iii) jobs that have completed. * * All three of these are protected by job_lock. */ spinlock_t job_lock; struct list_head complete_jobs; struct list_head io_jobs; struct list_head pages_jobs; }; static void wake(struct dm_kcopyd_client *kc) { queue_work(kc->kcopyd_wq, &kc->kcopyd_work); } static struct page_list *alloc_pl(void) { struct page_list *pl; Loading Loading @@ -209,21 +223,6 @@ struct kcopyd_job { static struct kmem_cache *_job_cache; static mempool_t *_job_pool; /* * We maintain three lists of jobs: * * i) jobs waiting for pages * ii) jobs that have pages, and are waiting for the io to be issued. * iii) jobs that have completed. * * All three of these are protected by job_lock. */ static DEFINE_SPINLOCK(_job_lock); static LIST_HEAD(_complete_jobs); static LIST_HEAD(_io_jobs); static LIST_HEAD(_pages_jobs); static int jobs_init(void) { _job_cache = KMEM_CACHE(kcopyd_job, 0); Loading @@ -241,10 +240,6 @@ static int jobs_init(void) static void jobs_exit(void) { BUG_ON(!list_empty(&_complete_jobs)); BUG_ON(!list_empty(&_io_jobs)); BUG_ON(!list_empty(&_pages_jobs)); mempool_destroy(_job_pool); kmem_cache_destroy(_job_cache); _job_pool = NULL; Loading @@ -255,18 +250,19 @@ static void jobs_exit(void) * Functions to push and pop a job onto the head of a given job * list. */ static struct kcopyd_job *pop(struct list_head *jobs) static struct kcopyd_job *pop(struct list_head *jobs, struct dm_kcopyd_client *kc) { struct kcopyd_job *job = NULL; unsigned long flags; spin_lock_irqsave(&_job_lock, flags); spin_lock_irqsave(&kc->job_lock, flags); if (!list_empty(jobs)) { job = list_entry(jobs->next, struct kcopyd_job, list); list_del(&job->list); } spin_unlock_irqrestore(&_job_lock, flags); spin_unlock_irqrestore(&kc->job_lock, flags); return job; } Loading @@ -274,10 +270,11 @@ static struct kcopyd_job *pop(struct list_head *jobs) static void push(struct list_head *jobs, struct kcopyd_job *job) { unsigned long flags; struct dm_kcopyd_client *kc = job->kc; spin_lock_irqsave(&_job_lock, flags); spin_lock_irqsave(&kc->job_lock, flags); list_add_tail(&job->list, jobs); spin_unlock_irqrestore(&_job_lock, flags); spin_unlock_irqrestore(&kc->job_lock, flags); } /* Loading Loading @@ -310,6 +307,7 @@ static int run_complete_job(struct kcopyd_job *job) static void complete_io(unsigned long error, void *context) { struct kcopyd_job *job = (struct kcopyd_job *) context; struct dm_kcopyd_client *kc = job->kc; if (error) { if (job->rw == WRITE) Loading @@ -318,21 +316,21 @@ static void complete_io(unsigned long error, void *context) job->read_err = 1; if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { push(&_complete_jobs, job); wake(); push(&kc->complete_jobs, job); wake(kc); return; } } if (job->rw == WRITE) push(&_complete_jobs, job); push(&kc->complete_jobs, job); else { job->rw = WRITE; push(&_io_jobs, job); push(&kc->io_jobs, job); } wake(); wake(kc); } /* Loading Loading @@ -369,7 +367,7 @@ static int run_pages_job(struct kcopyd_job *job) r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); if (!r) { /* this job is ready for io */ push(&_io_jobs, job); push(&job->kc->io_jobs, job); return 0; } Loading @@ -384,12 +382,13 @@ static int run_pages_job(struct kcopyd_job *job) * Run through a list for as long as possible. Returns the count * of successful jobs. */ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, int (*fn) (struct kcopyd_job *)) { struct kcopyd_job *job; int r, count = 0; while ((job = pop(jobs))) { while ((job = pop(jobs, kc))) { r = fn(job); Loading @@ -399,7 +398,7 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) job->write_err = (unsigned long) -1L; else job->read_err = 1; push(&_complete_jobs, job); push(&kc->complete_jobs, job); break; } Loading @@ -421,8 +420,11 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) /* * kcopyd does this every time it's woken up. */ static void do_work(struct work_struct *ignored) static void do_work(struct work_struct *work) { struct dm_kcopyd_client *kc = container_of(work, struct dm_kcopyd_client, kcopyd_work); /* * The order that these are called is *very* important. * complete jobs can free some pages for pages jobs. Loading @@ -430,9 +432,9 @@ static void do_work(struct work_struct *ignored) * list. io jobs call wake when they complete and it all * starts again. */ process_jobs(&_complete_jobs, run_complete_job); process_jobs(&_pages_jobs, run_pages_job); process_jobs(&_io_jobs, run_io_job); process_jobs(&kc->complete_jobs, kc, run_complete_job); process_jobs(&kc->pages_jobs, kc, run_pages_job); process_jobs(&kc->io_jobs, kc, run_io_job); } /* Loading @@ -442,9 +444,10 @@ static void do_work(struct work_struct *ignored) */ static void dispatch_job(struct kcopyd_job *job) { atomic_inc(&job->kc->nr_jobs); push(&_pages_jobs, job); wake(); struct dm_kcopyd_client *kc = job->kc; atomic_inc(&kc->nr_jobs); push(&kc->pages_jobs, job); wake(kc); } #define SUB_JOB_SIZE 128 Loading Loading @@ -625,15 +628,7 @@ static int kcopyd_init(void) return r; } _kcopyd_wq = create_singlethread_workqueue("kcopyd"); if (!_kcopyd_wq) { jobs_exit(); mutex_unlock(&kcopyd_init_lock); return -ENOMEM; } kcopyd_clients++; INIT_WORK(&_kcopyd_work, do_work); mutex_unlock(&kcopyd_init_lock); return 0; } Loading @@ -644,8 +639,6 @@ static void kcopyd_exit(void) kcopyd_clients--; if (!kcopyd_clients) { jobs_exit(); destroy_workqueue(_kcopyd_wq); _kcopyd_wq = NULL; } mutex_unlock(&kcopyd_init_lock); } Loading @@ -662,15 +655,31 @@ int dm_kcopyd_client_create(unsigned int nr_pages, kc = kmalloc(sizeof(*kc), GFP_KERNEL); if (!kc) { r = -ENOMEM; kcopyd_exit(); return -ENOMEM; return r; } spin_lock_init(&kc->lock); spin_lock_init(&kc->job_lock); INIT_LIST_HEAD(&kc->complete_jobs); INIT_LIST_HEAD(&kc->io_jobs); INIT_LIST_HEAD(&kc->pages_jobs); INIT_WORK(&kc->kcopyd_work, do_work); kc->kcopyd_wq = create_singlethread_workqueue("kcopyd"); if (!kc->kcopyd_wq) { r = -ENOMEM; kfree(kc); kcopyd_exit(); return r; } kc->pages = NULL; kc->nr_pages = kc->nr_free_pages = 0; r = client_alloc_pages(kc, nr_pages); if (r) { destroy_workqueue(kc->kcopyd_wq); kfree(kc); kcopyd_exit(); return r; Loading @@ -680,6 +689,7 @@ int dm_kcopyd_client_create(unsigned int nr_pages, if (IS_ERR(kc->io_client)) { r = PTR_ERR(kc->io_client); client_free_pages(kc); destroy_workqueue(kc->kcopyd_wq); kfree(kc); kcopyd_exit(); return r; Loading @@ -699,6 +709,10 @@ void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) /* Wait for completion of all jobs submitted by this client. */ wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); BUG_ON(!list_empty(&kc->complete_jobs)); BUG_ON(!list_empty(&kc->io_jobs)); BUG_ON(!list_empty(&kc->pages_jobs)); destroy_workqueue(kc->kcopyd_wq); dm_io_client_destroy(kc->io_client); client_free_pages(kc); client_del(kc); Loading Loading
drivers/md/kcopyd.c +73 −59 Original line number Diff line number Diff line Loading @@ -26,14 +26,6 @@ #include "kcopyd.h" #include "dm.h" static struct workqueue_struct *_kcopyd_wq; static struct work_struct _kcopyd_work; static void wake(void) { queue_work(_kcopyd_wq, &_kcopyd_work); } /*----------------------------------------------------------------- * Each kcopyd client has its own little pool of preallocated * pages for kcopyd io. Loading @@ -50,8 +42,30 @@ struct dm_kcopyd_client { wait_queue_head_t destroyq; atomic_t nr_jobs; struct workqueue_struct *kcopyd_wq; struct work_struct kcopyd_work; /* * We maintain three lists of jobs: * * i) jobs waiting for pages * ii) jobs that have pages, and are waiting for the io to be issued. * iii) jobs that have completed. * * All three of these are protected by job_lock. */ spinlock_t job_lock; struct list_head complete_jobs; struct list_head io_jobs; struct list_head pages_jobs; }; static void wake(struct dm_kcopyd_client *kc) { queue_work(kc->kcopyd_wq, &kc->kcopyd_work); } static struct page_list *alloc_pl(void) { struct page_list *pl; Loading Loading @@ -209,21 +223,6 @@ struct kcopyd_job { static struct kmem_cache *_job_cache; static mempool_t *_job_pool; /* * We maintain three lists of jobs: * * i) jobs waiting for pages * ii) jobs that have pages, and are waiting for the io to be issued. * iii) jobs that have completed. * * All three of these are protected by job_lock. */ static DEFINE_SPINLOCK(_job_lock); static LIST_HEAD(_complete_jobs); static LIST_HEAD(_io_jobs); static LIST_HEAD(_pages_jobs); static int jobs_init(void) { _job_cache = KMEM_CACHE(kcopyd_job, 0); Loading @@ -241,10 +240,6 @@ static int jobs_init(void) static void jobs_exit(void) { BUG_ON(!list_empty(&_complete_jobs)); BUG_ON(!list_empty(&_io_jobs)); BUG_ON(!list_empty(&_pages_jobs)); mempool_destroy(_job_pool); kmem_cache_destroy(_job_cache); _job_pool = NULL; Loading @@ -255,18 +250,19 @@ static void jobs_exit(void) * Functions to push and pop a job onto the head of a given job * list. */ static struct kcopyd_job *pop(struct list_head *jobs) static struct kcopyd_job *pop(struct list_head *jobs, struct dm_kcopyd_client *kc) { struct kcopyd_job *job = NULL; unsigned long flags; spin_lock_irqsave(&_job_lock, flags); spin_lock_irqsave(&kc->job_lock, flags); if (!list_empty(jobs)) { job = list_entry(jobs->next, struct kcopyd_job, list); list_del(&job->list); } spin_unlock_irqrestore(&_job_lock, flags); spin_unlock_irqrestore(&kc->job_lock, flags); return job; } Loading @@ -274,10 +270,11 @@ static struct kcopyd_job *pop(struct list_head *jobs) static void push(struct list_head *jobs, struct kcopyd_job *job) { unsigned long flags; struct dm_kcopyd_client *kc = job->kc; spin_lock_irqsave(&_job_lock, flags); spin_lock_irqsave(&kc->job_lock, flags); list_add_tail(&job->list, jobs); spin_unlock_irqrestore(&_job_lock, flags); spin_unlock_irqrestore(&kc->job_lock, flags); } /* Loading Loading @@ -310,6 +307,7 @@ static int run_complete_job(struct kcopyd_job *job) static void complete_io(unsigned long error, void *context) { struct kcopyd_job *job = (struct kcopyd_job *) context; struct dm_kcopyd_client *kc = job->kc; if (error) { if (job->rw == WRITE) Loading @@ -318,21 +316,21 @@ static void complete_io(unsigned long error, void *context) job->read_err = 1; if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { push(&_complete_jobs, job); wake(); push(&kc->complete_jobs, job); wake(kc); return; } } if (job->rw == WRITE) push(&_complete_jobs, job); push(&kc->complete_jobs, job); else { job->rw = WRITE; push(&_io_jobs, job); push(&kc->io_jobs, job); } wake(); wake(kc); } /* Loading Loading @@ -369,7 +367,7 @@ static int run_pages_job(struct kcopyd_job *job) r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); if (!r) { /* this job is ready for io */ push(&_io_jobs, job); push(&job->kc->io_jobs, job); return 0; } Loading @@ -384,12 +382,13 @@ static int run_pages_job(struct kcopyd_job *job) * Run through a list for as long as possible. Returns the count * of successful jobs. */ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, int (*fn) (struct kcopyd_job *)) { struct kcopyd_job *job; int r, count = 0; while ((job = pop(jobs))) { while ((job = pop(jobs, kc))) { r = fn(job); Loading @@ -399,7 +398,7 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) job->write_err = (unsigned long) -1L; else job->read_err = 1; push(&_complete_jobs, job); push(&kc->complete_jobs, job); break; } Loading @@ -421,8 +420,11 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) /* * kcopyd does this every time it's woken up. */ static void do_work(struct work_struct *ignored) static void do_work(struct work_struct *work) { struct dm_kcopyd_client *kc = container_of(work, struct dm_kcopyd_client, kcopyd_work); /* * The order that these are called is *very* important. * complete jobs can free some pages for pages jobs. Loading @@ -430,9 +432,9 @@ static void do_work(struct work_struct *ignored) * list. io jobs call wake when they complete and it all * starts again. */ process_jobs(&_complete_jobs, run_complete_job); process_jobs(&_pages_jobs, run_pages_job); process_jobs(&_io_jobs, run_io_job); process_jobs(&kc->complete_jobs, kc, run_complete_job); process_jobs(&kc->pages_jobs, kc, run_pages_job); process_jobs(&kc->io_jobs, kc, run_io_job); } /* Loading @@ -442,9 +444,10 @@ static void do_work(struct work_struct *ignored) */ static void dispatch_job(struct kcopyd_job *job) { atomic_inc(&job->kc->nr_jobs); push(&_pages_jobs, job); wake(); struct dm_kcopyd_client *kc = job->kc; atomic_inc(&kc->nr_jobs); push(&kc->pages_jobs, job); wake(kc); } #define SUB_JOB_SIZE 128 Loading Loading @@ -625,15 +628,7 @@ static int kcopyd_init(void) return r; } _kcopyd_wq = create_singlethread_workqueue("kcopyd"); if (!_kcopyd_wq) { jobs_exit(); mutex_unlock(&kcopyd_init_lock); return -ENOMEM; } kcopyd_clients++; INIT_WORK(&_kcopyd_work, do_work); mutex_unlock(&kcopyd_init_lock); return 0; } Loading @@ -644,8 +639,6 @@ static void kcopyd_exit(void) kcopyd_clients--; if (!kcopyd_clients) { jobs_exit(); destroy_workqueue(_kcopyd_wq); _kcopyd_wq = NULL; } mutex_unlock(&kcopyd_init_lock); } Loading @@ -662,15 +655,31 @@ int dm_kcopyd_client_create(unsigned int nr_pages, kc = kmalloc(sizeof(*kc), GFP_KERNEL); if (!kc) { r = -ENOMEM; kcopyd_exit(); return -ENOMEM; return r; } spin_lock_init(&kc->lock); spin_lock_init(&kc->job_lock); INIT_LIST_HEAD(&kc->complete_jobs); INIT_LIST_HEAD(&kc->io_jobs); INIT_LIST_HEAD(&kc->pages_jobs); INIT_WORK(&kc->kcopyd_work, do_work); kc->kcopyd_wq = create_singlethread_workqueue("kcopyd"); if (!kc->kcopyd_wq) { r = -ENOMEM; kfree(kc); kcopyd_exit(); return r; } kc->pages = NULL; kc->nr_pages = kc->nr_free_pages = 0; r = client_alloc_pages(kc, nr_pages); if (r) { destroy_workqueue(kc->kcopyd_wq); kfree(kc); kcopyd_exit(); return r; Loading @@ -680,6 +689,7 @@ int dm_kcopyd_client_create(unsigned int nr_pages, if (IS_ERR(kc->io_client)) { r = PTR_ERR(kc->io_client); client_free_pages(kc); destroy_workqueue(kc->kcopyd_wq); kfree(kc); kcopyd_exit(); return r; Loading @@ -699,6 +709,10 @@ void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) /* Wait for completion of all jobs submitted by this client. */ wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); BUG_ON(!list_empty(&kc->complete_jobs)); BUG_ON(!list_empty(&kc->io_jobs)); BUG_ON(!list_empty(&kc->pages_jobs)); destroy_workqueue(kc->kcopyd_wq); dm_io_client_destroy(kc->io_client); client_free_pages(kc); client_del(kc); Loading