root/branches/asyncio/src/client/usrint/aiocommon.c @ 9428

Revision 9428, 16.1 KB (checked in by sdsnyde, 10 months ago)

fixed small bug fixes in the aio-open state machine. pxfs_open(64) and pxfs_creat(64) have been tested and are working

Line 
1/*
2 * (C) 2011 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include "usrint.h"
8#include "posix-ops.h"
9#include "openfile-util.h"
10#include "iocommon.h"
11#include "aiocommon.h"
12
13/* prototypes */
14static void aiocommon_run_waiting_ops(void);
15static void aiocommon_run_op(struct pvfs_aiocb *p_cb);
16static void aiocommon_finish_op(struct pvfs_aiocb *p_cb);
17static void *aiocommon_progress(void *ptr);
18
19/* linked list variables used to implement aio structures */
20static struct qlist_head *aio_waiting_list = NULL;
21static struct qlist_head *aio_running_list = NULL;
22static gen_mutex_t aio_wait_list_mutex = GEN_MUTEX_INITIALIZER;
23
24/* PROGRESS THREAD VARIABLES */
25static pthread_t aio_progress_thread;
26static int aio_progress_status = PVFS_AIO_PROGRESS_IDLE;
27static int aio_num_ops_running = 0;
28static PVFS_sys_op_id aio_running_ops[PVFS_AIO_MAX_RUNNING] = {0};
29
30/* Initialization of PVFS AIO system */
31int aiocommon_init(void)
32{
33    /* initialize waiting, running, and finished lists for aio implementation */
34    aio_waiting_list = (struct qlist_head *)malloc(sizeof(struct qlist_head));
35    if (aio_waiting_list == NULL)
36    {
37        return -1;
38    }
39    INIT_QLIST_HEAD(aio_waiting_list);
40
41    aio_running_list = (struct qlist_head *)malloc(sizeof(struct qlist_head));
42    if (aio_running_list == NULL)
43    {
44        return -1;
45    }
46    INIT_QLIST_HEAD(aio_running_list);
47
48    gossip_debug(GOSSIP_USRINT_DEBUG, "Successfully initalized PVFS AIO inteface\n");
49
50    return 0;
51}
52
53void aiocommon_submit_op(struct pvfs_aiocb *p_cb)
54{
55    p_cb->error_code = EINPROGRESS;
56    gen_mutex_lock(&aio_wait_list_mutex);
57    qlist_add_tail(&(p_cb->link), aio_waiting_list);
58    if (aio_progress_status == PVFS_AIO_PROGRESS_IDLE)
59    {
60        pthread_create(&aio_progress_thread, NULL, aiocommon_progress, NULL);
61        aio_progress_status = PVFS_AIO_PROGRESS_RUNNING;
62    }
63    gen_mutex_unlock(&aio_wait_list_mutex);
64
65    return;
66}
67
68static void aiocommon_run_waiting_ops(void)
69{
70    struct qlist_head *next_op;
71    struct pvfs_aiocb *p_cb;
72
73    while(1)
74    {
75        gen_mutex_lock(&aio_wait_list_mutex);
76        if (aio_num_ops_running == PVFS_AIO_MAX_RUNNING ||
77            qlist_empty(aio_waiting_list))
78        {
79            gen_mutex_unlock(&aio_wait_list_mutex);
80            break;
81        }
82
83        next_op = qlist_pop(aio_waiting_list);
84        p_cb = qlist_entry(next_op, struct pvfs_aiocb, link);
85        gossip_debug(GOSSIP_USRINT_DEBUG, "Adding AIO CB %p to running list\n",
86                     p_cb);
87
88        gen_mutex_unlock(&aio_wait_list_mutex);
89        aiocommon_run_op(p_cb);
90    }
91
92    return;
93}
94
95static void aiocommon_run_op(struct pvfs_aiocb *p_cb)
96{
97    int rc = 0;
98    PVFS_credential *cred;
99
100    rc = iocommon_cred(&cred);
101   
102    switch(p_cb->op_code)
103    {
104        case PVFS_AIO_IO_OP:
105        {
106            rc = PVFS_Request_contiguous(p_cb->u.io.vector->iov_len, PVFS_BYTE,
107                                         &(p_cb->u.io.file_req));
108            if (rc < 0)
109            {
110                rc = -PVFS_ENOMEM;
111                break;
112            }
113            rc = pvfs_convert_iovec(p_cb->u.io.vector, 1,
114                                    &(p_cb->u.io.mem_req),
115                                    &(p_cb->u.io.sys_buf));
116            if (rc < 0)
117            {
118                rc = -PVFS_ENOMEM;
119                break;
120            }
121
122            rc = PVFS_isys_io(p_cb->u.io.pd->s->pvfs_ref,
123                              p_cb->u.io.file_req,
124                              p_cb->u.io.offset,
125                              p_cb->u.io.sys_buf,
126                              p_cb->u.io.mem_req,
127                              cred,
128                              &(p_cb->u.io.io_resp),
129                              p_cb->u.io.which,
130                              &(p_cb->op_id),
131                              p_cb->hints,
132                              (void *)p_cb);
133            break;
134        }
135        case PVFS_AIO_IOV_OP:
136        {
137            int i, size = 0;
138            for (i = 0; i < p_cb->u.io.count; i++)
139            {
140                size += p_cb->u.io.vector[i].iov_len;
141            }
142            rc = PVFS_Request_contiguous(size, PVFS_BYTE, &(p_cb->u.io.file_req));
143            if (rc < 0)
144            {
145                rc = -PVFS_ENOMEM;
146                break;
147            }
148            rc = pvfs_convert_iovec(p_cb->u.io.vector, p_cb->u.io.count,
149                                    &(p_cb->u.io.mem_req), &(p_cb->u.io.sys_buf));
150            if (rc < 0)
151            {
152                rc = -PVFS_ENOMEM;
153                break;
154            }
155
156            rc = PVFS_isys_io(p_cb->u.io.pd->s->pvfs_ref,
157                              p_cb->u.io.file_req,
158                              p_cb->u.io.offset,
159                              p_cb->u.io.sys_buf,
160                              p_cb->u.io.mem_req,
161                              cred,
162                              &(p_cb->u.io.io_resp),
163                              p_cb->u.io.which,
164                              &(p_cb->op_id),
165                              p_cb->hints,
166                              (void *)p_cb);
167            break;
168        }
169        case PVFS_AIO_OPEN_OP:
170        {
171            rc = PVFS_iaio_open(&(p_cb->u.open.pd),
172                                p_cb->u.open.path,
173                                p_cb->u.open.directory,
174                                p_cb->u.open.filename,
175                                p_cb->u.open.flags,
176                                p_cb->u.open.file_creation_param,
177                                p_cb->u.open.mode,
178                                p_cb->u.open.pdir,
179                                cred,
180                                &(p_cb->op_id),
181                                p_cb->hints,
182                                (void *)p_cb);
183            break;
184        }
185        case PVFS_AIO_RENAME_OP:
186        {
187            rc = PVFS_iaio_rename(p_cb->u.rename.oldpdir,
188                                  p_cb->u.rename.olddir,
189                                  p_cb->u.rename.oldname,
190                                  p_cb->u.rename.newpdir,
191                                  p_cb->u.rename.newdir,
192                                  p_cb->u.rename.newname,
193                                  cred,
194                                  &(p_cb->op_id),
195                                  p_cb->hints,
196                                  (void *)p_cb);
197            break;
198        }
199        case PVFS_AIO_STAT_OP:
200        {   
201            rc = PVFS_isys_getattr(p_cb->u.stat.pd->s->pvfs_ref,
202                                   p_cb->u.stat.mask,
203                                   cred,
204                                   &(p_cb->u.stat.getattr_resp),
205                                   &(p_cb->op_id),
206                                   p_cb->hints,
207                                   (void *)p_cb);
208            break;
209        }
210        case PVFS_AIO_STAT64_OP:
211        {
212            rc = PVFS_isys_getattr(p_cb->u.stat.pd->s->pvfs_ref,
213                                   p_cb->u.stat.mask,
214                                   cred,
215                                   &(p_cb->u.stat.getattr_resp),
216                                   &(p_cb->op_id),
217                                   p_cb->hints,
218                                   (void *)p_cb);
219            break;
220        }
221        default:
222        {
223            rc = -PVFS_EINVAL;
224            break;
225        }
226    }
227
228    if (rc < 0)
229    {
230        p_cb->error_code = rc;
231        aiocommon_finish_op(p_cb);
232    }
233    else if(rc >= 0 && p_cb->op_id == -1)
234    {
235        p_cb->error_code = 0;   
236        aiocommon_finish_op(p_cb);
237    }
238    else
239    {
240        /* the operation deferred completion */
241        gossip_debug(GOSSIP_USRINT_DEBUG, "AIO CB %p, DEFERRED\n", p_cb);
242        qlist_add_tail(&p_cb->link, aio_running_list);
243        aio_running_ops[aio_num_ops_running++] = p_cb->op_id;
244    }
245
246    return;
247}
248
249static void aiocommon_finish_op(struct pvfs_aiocb *p_cb)
250{
251    int status = 0;
252
253    switch (p_cb->op_code)
254    {
255        case PVFS_AIO_IO_OP:
256        {
257            if (p_cb->error_code < 0)
258            {
259                *(p_cb->u.io.bcnt) = -1;
260            }
261            else
262            {
263                *(p_cb->u.io.bcnt) = p_cb->u.io.io_resp.total_completed;
264                if (p_cb->u.io.advance_fp)
265                {
266                    gen_mutex_lock(&(p_cb->u.io.pd->s->lock));
267                    p_cb->u.io.pd->s->file_pointer += *(p_cb->u.io.bcnt);
268                    gen_mutex_unlock(&(p_cb->u.io.pd->s->lock));
269                }
270            }
271            free(p_cb->u.io.vector);
272            PVFS_Request_free(&(p_cb->u.io.mem_req));
273            PVFS_Request_free(&(p_cb->u.io.file_req));
274            break;
275        }
276        case PVFS_AIO_IOV_OP:
277        {
278            if (p_cb->error_code < 0)
279            {
280                *(p_cb->u.io.bcnt) = -1;
281            }
282            else
283            {
284                *(p_cb->u.io.bcnt) = p_cb->u.io.io_resp.total_completed;
285                gen_mutex_lock(&(p_cb->u.io.pd->s->lock));
286                p_cb->u.io.pd->s->file_pointer += *(p_cb->u.io.bcnt);
287                gen_mutex_unlock(&(p_cb->u.io.pd->s->lock));
288            }
289            PVFS_Request_free(&(p_cb->u.io.mem_req));
290            PVFS_Request_free(&(p_cb->u.io.file_req));
291            break;
292        }
293        case PVFS_AIO_OPEN_OP:
294        {
295            if (p_cb->error_code < 0)
296            {
297                *(p_cb->u.open.fd) = -1;
298            }
299            else
300            {
301                *(p_cb->u.open.fd) = p_cb->u.open.pd->fd;
302            }
303            break;
304        }
305        case PVFS_AIO_RENAME_OP:
306        {
307            free(p_cb->u.rename.olddir);
308            free(p_cb->u.rename.oldname);
309            free(p_cb->u.rename.newdir);
310            free(p_cb->u.rename.newname);
311            break;
312        }
313        case PVFS_AIO_STAT_OP:
314        {
315            PVFS_sys_attr attr = p_cb->u.stat.getattr_resp.attr;
316            struct stat *buf = (struct stat *)p_cb->u.stat.buf;
317
318            buf->st_dev = p_cb->u.stat.pd->s->pvfs_ref.fs_id;
319            buf->st_ino = p_cb->u.stat.pd->s->pvfs_ref.handle;
320            buf->st_mode = attr.perms;
321            if (attr.objtype == PVFS_TYPE_METAFILE)
322            {
323                buf->st_mode |= S_IFREG;
324            }
325            if (attr.objtype == PVFS_TYPE_DIRECTORY)
326            {
327                buf->st_mode |= S_IFDIR;
328            }
329            if (attr.objtype == PVFS_TYPE_SYMLINK)
330            {
331                buf->st_mode |= S_IFLNK;
332            }
333            buf->st_nlink = 1; /* PVFS does not allow hard links */
334            buf->st_uid = attr.owner;
335            buf->st_gid = attr.group;
336            buf->st_rdev = 0; /* no dev special files */
337            buf->st_size = attr.size;
338            buf->st_blksize = attr.blksize;
339            if (attr.blksize)
340            {
341                buf->st_blocks = (attr.size + (attr.blksize - 1)) / attr.blksize;
342            }
343            buf->st_atime = attr.atime;
344            buf->st_mtime = attr.mtime;
345            buf->st_ctime = attr.ctime;
346            break;
347        }
348        case PVFS_AIO_STAT64_OP:
349        {
350            PVFS_sys_attr attr = p_cb->u.stat.getattr_resp.attr;
351            struct stat64 *buf = (struct stat64 *)p_cb->u.stat.buf;
352
353            buf->st_dev = p_cb->u.stat.pd->s->pvfs_ref.fs_id;
354            buf->st_ino = p_cb->u.stat.pd->s->pvfs_ref.handle;
355            buf->st_mode = attr.perms;
356            if (attr.objtype == PVFS_TYPE_METAFILE)
357            {
358                buf->st_mode |= S_IFREG;
359            }
360            if (attr.objtype == PVFS_TYPE_DIRECTORY)
361            {
362                buf->st_mode |= S_IFDIR;
363            }
364            if (attr.objtype == PVFS_TYPE_SYMLINK)
365            {
366                buf->st_mode |= S_IFLNK;
367            }
368            buf->st_nlink = 1; /* PVFS does not allow hard links */
369            buf->st_uid = attr.owner;
370            buf->st_gid = attr.group;
371            buf->st_rdev = 0; /* no dev special files */
372            buf->st_size = attr.size;
373            buf->st_blksize = attr.blksize;
374            if (attr.blksize)
375            {
376                buf->st_blocks = (attr.size + (attr.blksize - 1)) / attr.blksize;
377            }
378            buf->st_atime = attr.atime;
379            buf->st_mtime = attr.mtime;
380            buf->st_ctime = attr.ctime;
381            break;
382        }
383        default:
384        {
385            break;
386        }
387    }
388
389    /* set the status with the error number, if an error occured */
390    if (p_cb->error_code < 0)
391    {
392        if (IS_PVFS_NON_ERRNO_ERROR(-(p_cb->error_code)))
393        {
394            status = EIO;
395        }
396        else if (IS_PVFS_ERROR(-(p_cb->error_code)))
397        {
398            status = PINT_errno_mapping[(-(p_cb->error_code)) & 0x7f];
399        }
400    }
401
402    if (p_cb->call_back_fn)
403    {
404        (*p_cb->call_back_fn)(p_cb->call_back_dat, status);
405    }
406
407    return;
408}
409
410static void *aiocommon_progress(void *ptr)
411{
412    int i, j;
413    int ret = 0;
414    int op_count = 0;
415    PVFS_sys_op_id ret_op_ids[PVFS_AIO_MAX_RUNNING];
416    PVFS_sys_op_id temp_running_ops[PVFS_AIO_MAX_RUNNING] = {0};
417    int err_code_array[PVFS_AIO_MAX_RUNNING] = {0};
418    struct pvfs_aiocb *pcb_array[PVFS_AIO_MAX_RUNNING] = {NULL};
419
420    pvfs_sys_init();
421    gossip_debug(GOSSIP_USRINT_DEBUG, "AIO progress thread starting up\n");
422
423    /* progress thread */
424    while (1)
425    {
426        /* run any queued async io operations */
427        aiocommon_run_waiting_ops();     
428
429        /* check to see if the progress thread should exit
430         * (no more waiting or running operations)
431         */
432        gen_mutex_lock(&aio_wait_list_mutex);
433        if (!aio_num_ops_running && qlist_empty(aio_waiting_list))
434        {
435            gossip_debug(GOSSIP_USRINT_DEBUG,
436                         "No AIO requests waiting, progress thread exiting\n");
437            aio_progress_status = PVFS_AIO_PROGRESS_IDLE;
438            gen_mutex_unlock(&aio_wait_list_mutex);
439            pthread_exit(NULL);
440        }
441        gen_mutex_unlock(&aio_wait_list_mutex);
442
443        /* call PVFS_sys_testsome() to force progress on "running" operations
444         * in the system.
445         * NOTE: the op_ids of the completed ops will be in ret_op_ids,
446         * the number of operations will be in op_count, the user pointers
447         * are stored in pcb_array, and the error codes are stored in
448         * err_code array.
449         */
450        memcpy(ret_op_ids, aio_running_ops,
451               (aio_num_ops_running * sizeof(PVFS_sys_op_id)));
452        op_count = aio_num_ops_running;
453        ret = PVFS_sys_testsome(ret_op_ids,
454                                &op_count,
455                                (void *)pcb_array,
456                                err_code_array,
457                                PVFS_AIO_DEFAULT_TIMEOUT_MS);
458
459        /* for each op returned */
460        for (i = 0; i < op_count; i++)
461        {
462            /* ignore completed items that do not have a user pointer
463             * (these are not aio control blocks)
464             */
465            if (pcb_array[i] == NULL) continue;
466
467            /* remove the cb from the running list */
468            qlist_del(&(pcb_array[i]->link));
469
470            pcb_array[i]->error_code = err_code_array[i];
471            aiocommon_finish_op(pcb_array[i]);
472
473            /* update the number of running cbs, and exit the thread
474             * if this number is 0
475             */
476            for (j = 0; j < aio_num_ops_running; j++)
477            {
478                if (aio_running_ops[j] == pcb_array[i]->op_id)
479                {
480                    free(pcb_array[i]);
481                    /* remove it from the running op_ids array */
482                    if (j > 0)
483                        memcpy(temp_running_ops, aio_running_ops,
484                               j * sizeof(PVFS_sys_op_id));
485                    if (j < aio_num_ops_running - 1)
486                        memcpy(temp_running_ops + j, aio_running_ops + j + 1,
487                               (aio_num_ops_running - 1 - j) * sizeof(PVFS_sys_op_id));
488                    memcpy(aio_running_ops, temp_running_ops,
489                           (PVFS_AIO_MAX_RUNNING * sizeof(PVFS_sys_op_id)));
490                    break;
491                }
492            }
493
494            aio_num_ops_running--;
495        }
496    }
497}
498
499/*
500 * Local variables:
501 *  c-indent-level: 4
502 *  c-basic-offset: 4
503 * End:
504 *
505 * vim: ts=8 sts=4 sw=4 expandtab
506 */
Note: See TracBrowser for help on using the browser.