root/branches/stable/src/io/trove/trove-dbpf/dbpf-bstream-direct.c @ 9247

Revision 9247, 51.8 KB (checked in by k, 14 months ago)

Support for F_NOCACHE fcntl on OS X

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include <stdlib.h>
8#include <unistd.h>
9#include <stdio.h>
10#include <sys/types.h>
11#include <sys/stat.h>
12#include <fcntl.h>
13#include <stdlib.h>
14#ifdef HAVE_MALLOC_H
15#include <malloc.h>
16#endif
17#include <assert.h>
18#include <errno.h>
19#include <string.h>
20
21#include "gossip.h"
22#include "pvfs2-debug.h"
23#include "trove.h"
24#include "trove-internal.h"
25#include "dbpf.h"
26#include "dbpf-op.h"
27#include "dbpf-op-queue.h"
28#include "dbpf-attr-cache.h"
29#include "dbpf-bstream.h"
30#include "dbpf-sync.h"
31#include "pint-mem.h"
32#include "pint-mgmt.h"
33#include "pint-context.h"
34#include "pint-op.h"
35
36static gen_mutex_t dbpf_update_size_lock = GEN_MUTEX_INITIALIZER;
37static gen_mutex_t grow_bstream_table_lock = GEN_MUTEX_INITIALIZER;
38
39typedef struct
40{
41    char *buffer;
42    TROVE_size size;
43    TROVE_offset offset;
44} dbpf_stream_extents_t;
45
46struct qhash_table *grow_bstream_table = NULL;
47
48struct grow_bstream_handle
49{
50    struct qlist_head hash_link;
51    gen_mutex_t handle_lock;
52    gen_mutex_t refcount_lock;
53    PVFS_handle handle;
54    int refcount;
55};
56
57static int dbpf_bstream_get_extents(
58    char **mem_offset_array,
59    TROVE_size *mem_size_array,
60    int mem_count,
61    TROVE_offset *stream_offset_array,
62    TROVE_size *stream_size_array,
63    int stream_count,
64    int *ext_count,
65    dbpf_stream_extents_t *extents);
66
67static int hash_handle_compare(
68    void *key,
69    struct qlist_head *link);
70
71static int hash_handle(
72    void *handle,
73    int table_size);
74
75static int grow_bstream_handle_table_init( int size );
76static int grow_bstream_handle_acquire_lock( TROVE_object_ref ref );
77static int grow_bstream_handle_release_lock( TROVE_object_ref ref );
78
79static size_t direct_aligned_write(int fd,
80                                    void *buf,
81                                    off_t buf_offset,
82                                    size_t size,
83                                    off_t write_offset,
84                                    off_t stream_size);
85
86static size_t direct_locked_write(int fd,
87                            void * buf,
88                            off_t buf_offset,
89                            size_t size,
90                            off_t write_offset,
91                            off_t stream_size);
92
93#if 0
94static size_t new_direct_write(int fd,
95                               void * buf,
96                               off_t buf_offset,
97                               size_t size,
98                               off_t write_offset,
99                               off_t stream_size);
100#endif
101
102static size_t direct_write(int fd,
103                           void * buf,
104                           off_t buf_offset,
105                           size_t size,
106                           off_t write_offset,
107                           off_t stream_size);
108
109static size_t direct_aligned_read(int fd,
110                                  void *buf,
111                                  off_t buf_offset,
112                                  size_t size,
113                                  off_t file_offset,
114                                  off_t stream_size);
115
116static size_t direct_locked_read(int fd,
117                            void * buf,
118                            off_t buf_offset,
119                            size_t size,
120                            off_t file_offset,
121                            off_t stream_size);
122
123static size_t direct_read(int fd,
124                          void * buf,
125                          off_t buf_offset,
126                          size_t size,
127                          off_t file_offset,
128                          off_t stream_size);
129
130#define BLOCK_SIZE 4096
131
132/* compute the mask of 1s that allows us to essentially throw away
133 * all bits less than the block size.
134 */
135#define BLOCK_MULTIPLES_MASK (~((uintptr_t) BLOCK_SIZE - 1))
136
137/* calculate the max offset that is a multiple of the block size but still
138 * less than or equal to requested offset passed in
139 */
140#define ALIGNED_OFFSET(__offset) (__offset & BLOCK_MULTIPLES_MASK)
141
142/* calculate the minimum size that is a multiple of the block size and
143 * still greater than or equal to the requested size
144 */
145#define ALIGNED_SIZE(__offset, __size) \
146    (((__offset + __size + BLOCK_SIZE - 1) \
147      & BLOCK_MULTIPLES_MASK) - ALIGNED_OFFSET(__offset))
148
149#define IS_ALIGNED_PTR(__ptr) \
150    ((((uintptr_t)__ptr) & BLOCK_MULTIPLES_MASK) == (uintptr_t)__ptr)
151
152extern PINT_manager_t io_thread_mgr;
153extern PINT_worker_id io_worker_id;
154extern PINT_queue_id io_queue_id;
155
156#if 0
157struct aligned_block
158{
159    void *ptr;
160    struct qlist_head link;
161};
162static struct aligned_block *blocks;
163static void *aligned_blocks_buffer;
164static QLIST_HEAD(aligned_blocks_unused);
165static QLIST_HEAD(aligned_blocks_used);
166static gen_mutex_t aligned_blocks_mutex = GEN_MUTEX_INITIALIZER;
167static int used_count;
168
169int dbpf_aligned_blocks_init(void);
170void * dbpf_aligned_block_get(void);
171int dbpf_aligned_block_put(void *ptr);
172int dbpf_aligned_blocks_finalize(void);
173#endif
174
175/**
176 * Perform an write in direct mode (no buffering).
177 *
178 * @param fd - The file descriptor of the bstream to do the write on.  THe
179 * file descriptor is required to opened with O_DIRECT.  In debug mode,
180 * the O_DIRECT option is checked.
181 *
182 * @param buf - the buffer containing the bytes to write to the bstream.  The
183 * buffer is required to be allocated with the correct alignment (to a block
184 * size of 512)
185 *
186 * @param buf_offset - the offset into the buffer that the write should start
187 *
188 * @param size - the size of bytes to write from the buffer to
189 * the file.
190 *
191 * @param write_offset - the offset into the bstream to start the write
192 *
193 * @param stream_size - the actual size of the bstream (might be stored
194 *                      elsewhere)
195 *
196 * @returns bytes written, otherwise a negative errno error code
197 */
198static size_t direct_aligned_write(int fd,
199                                    void *buf,
200                                    off_t buf_offset,
201                                    size_t size,
202                                    off_t write_offset,
203                                    off_t stream_size)
204{
205    int ret;
206
207#ifndef NDEBUG
208    /* if debug is enabled, check that fd was opened with O_DIRECT */
209#ifdef HAVE_OPEN_O_DIRECT
210    if(!(fcntl(fd, F_GETFL) & O_DIRECT))
211    {
212        return -EINVAL;
213    }
214#endif
215
216#ifdef HAVE_FNCTL_F_NOCACHE
217    if (!(fcntl(fd, F_GETFL) & F_NOCACHE))
218    {
219        return -EINVAL;
220    }
221#endif
222#endif
223
224    /* verify that the buffer is aligned properly */
225    assert(IS_ALIGNED_PTR(buf));
226
227    /* verify that the offset is aligned as well */
228    assert(ALIGNED_OFFSET(buf_offset) == buf_offset);
229
230    /* and the size */
231    assert(ALIGNED_SIZE(write_offset, size) == size);
232
233    /* and the offset into the file */
234    assert(ALIGNED_OFFSET(write_offset) == write_offset);
235
236    ret = dbpf_pwrite(fd, (((char *)buf) + buf_offset), size, write_offset);
237    if(ret < 0)
238    {
239        gossip_err(
240            "dbpf_direct_write: failed to perform aligned write\n");
241        return ret;
242    }
243
244    return ret;
245}
246
247/* static int writes_outstanding = 0;
248gen_mutex_t writes_lock = GEN_MUTEX_INITIALIZER; */
249
250static size_t direct_locked_write(int fd,
251                            void * buf,
252                            off_t buf_offset,
253                            size_t size,
254                            off_t write_offset,
255                            off_t stream_size)
256{
257    struct flock writelock;
258    int ret, write_ret;
259/*      struct timeval start, end; */
260
261    writelock.l_type = F_WRLCK;
262    writelock.l_whence = SEEK_SET;
263    writelock.l_start = (off_t)ALIGNED_OFFSET(write_offset);
264    writelock.l_len = (off_t)ALIGNED_SIZE(write_offset, size);
265    ret = fcntl(fd, F_SETLKW, &writelock);
266    if(ret < 0 && errno == EINTR)
267    {
268        gossip_err("%s: failed to lock flock before writing\n", __func__);
269        return -trove_errno_to_trove_error(errno);
270    }
271    writelock.l_type = F_UNLCK;
272
273    write_ret = direct_write(
274        fd, buf, buf_offset, size, write_offset, stream_size);
275
276    ret = fcntl(fd, F_SETLK, &writelock);
277    if (ret < 0)
278    {
279        gossip_err("%s: failed to unlock flock after writing\n", __func__);
280        return -trove_errno_to_trove_error (errno);
281    }
282
283#if 0
284    if(write_ret > 0)
285    {
286        if((write_offset + size) > stream_size)
287        {
288            ret = DBPF_RESIZE(fd, (write_offset + size));
289            if(ret < 0)
290            {
291                gossip_err("failed ftruncate of O_DIRECT fd to size: %d\n",
292                           (write_offset + size));
293                return -trove_errno_to_trove_error(errno);
294            }
295        }
296    }
297#endif
298
299    return write_ret;
300}
301
302#if 0
303static size_t new_direct_write(int fd,
304                               void * buf,
305                               off_t buf_offset,
306                               size_t size,
307                               off_t write_offset,
308                               off_t stream_size)
309{
310    size_t ret;
311    void *aligned_buf;
312    size_t aligned_size;
313    off_t aligned_offset, end_offset, aligned_end_offset;
314
315    aligned_size = ALIGNED_SIZE(write_offset, size);
316    aligned_offset = ALIGNED_OFFSET(write_offset);
317
318    /* if the buffer passed in, the offsets, and the size are all
319     * aligned properly, just pass through directly
320     */
321    if(IS_ALIGNED_PTR(buf) &&
322       ALIGNED_OFFSET(buf_offset) == buf_offset &&
323       aligned_size == size)
324    {
325        return direct_aligned_write(fd, buf, buf_offset,
326                                   size, write_offset, stream_size);
327    }
328
329    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
330                 "requested write is not aligned, doing memcpy:\n\t"
331                 "buf: %p, "
332                 "buf_offset: %llu, "
333                 "size: %zu, \n\t"
334                 "write_offset: %llu, "
335                 "stream_size: %zu\n",
336                 buf,
337                 llu(buf_offset),
338                 size,
339                 llu(write_offset),
340                 stream_size);
341
342    aligned_buf = dbpf_aligned_block_get();
343    if(!aligned_buf)
344    {
345        return -ENOMEM;
346    }
347
348    /* Do read-modify-write on the ends of the buffer if
349     * the offsets and sizes aren't aligned properly
350     */
351    if(aligned_offset < write_offset)
352    {
353        ret = 0;
354        if(ALIGNED_SIZE(0, stream_size) > aligned_offset)
355        {
356            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at front\n");
357            /* read the first block */
358            ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
359            if(ret < 0)
360            {
361                int pread_errno = errno;
362                gossip_err(
363                    "direct_memcpy_write: RMW failed at "
364                    "beginning of request\n");
365                dbpf_aligned_block_put(aligned_buf);
366
367                return -trove_errno_to_trove_error(pread_errno);
368            }
369        }
370        else
371        {
372            memset(aligned_buf, 0, BLOCK_SIZE);
373        }
374
375        memcpy(((char *)buf) - (write_offset - aligned_offset),
376               aligned_buf, (write_offset - aligned_offset));
377    }
378
379    end_offset = write_offset + size;
380    aligned_end_offset = aligned_offset + aligned_size;
381
382    if(aligned_end_offset > end_offset)
383    {
384        ret = 0;
385        if(ALIGNED_SIZE(0, stream_size) >= aligned_end_offset)
386        {
387            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at end\n");
388            ret = dbpf_pread(fd,
389                             aligned_buf,
390                             BLOCK_SIZE,
391                             aligned_end_offset - BLOCK_SIZE);
392            if(ret < 0)
393            {
394                int pread_errno = errno;
395                gossip_err(
396                    "direct_memcpy_write: RMW failed at end of request\n");
397                dbpf_aligned_block_put(aligned_buf);
398
399                return -trove_errno_to_trove_error(pread_errno);
400            }
401        }
402        else
403        {
404            memset(aligned_buf, 0, BLOCK_SIZE);
405        }
406
407        memcpy(((char *)buf) + size,
408               ((char *)aligned_buf) + (end_offset % BLOCK_SIZE),
409               (aligned_end_offset - end_offset));
410    }
411
412    ret = direct_aligned_write(
413        fd,
414        ((char *)buf) - (write_offset - aligned_offset), 0,
415        aligned_size, aligned_offset, stream_size);
416
417    dbpf_aligned_block_put(aligned_buf);
418
419    return size;
420}
421#endif
422
423static size_t direct_write(int fd,
424                           void * buf,
425                           off_t buf_offset,
426                           size_t size,
427                           off_t write_offset,
428                           off_t stream_size)
429{
430    size_t ret;
431    void * aligned_buf;
432    size_t aligned_size;
433    off_t aligned_offset, end_offset, aligned_end_offset;
434
435    aligned_size = ALIGNED_SIZE(write_offset, size);
436    aligned_offset = ALIGNED_OFFSET(write_offset);
437
438    /* if the buffer passed in, the offsets, and the size are all
439     * aligned properly, just pass through directly
440     */
441    if(IS_ALIGNED_PTR(buf) &&
442       ALIGNED_OFFSET(buf_offset) == buf_offset &&
443       aligned_size == size)
444    {
445        return direct_aligned_write(fd, buf, buf_offset,
446                                   size, write_offset, stream_size);
447    }
448
449    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
450                 "requested write is not aligned, doing memcpy:\n\t"
451                 "buf: %p, "
452                 "buf_offset: %llu, "
453                 "size: %zu, \n\t"
454                 "write_offset: %llu, "
455                 "stream_size: %llu\n",
456                 buf,
457                 llu(buf_offset),
458                 size,
459                 llu(write_offset),
460                 llu(stream_size));
461
462    aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
463    if(!aligned_buf)
464    {
465        return -ENOMEM;
466    }
467
468    /* Do read-modify-write on the ends of the buffer if
469     * the offsets and sizes aren't aligned properly
470     */
471    if(aligned_offset < write_offset)
472    {
473        ret = 0;
474        if(ALIGNED_SIZE(0, stream_size) > aligned_offset)
475        {
476            /* read the first block */
477            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at front\n");
478            ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
479            if(ret < 0)
480            {
481                int pread_errno = errno;
482                gossip_err(
483                    "direct_memcpy_write: RMW failed at "
484                    "beginning of request\n");
485                PINT_mem_aligned_free(aligned_buf);
486
487                return -trove_errno_to_trove_error(pread_errno);
488            }
489        }
490        else
491        {
492            memset(aligned_buf, 0, BLOCK_SIZE);
493        }
494    }
495
496    end_offset = write_offset + size;
497    aligned_end_offset = aligned_offset + aligned_size;
498
499    if(aligned_end_offset > end_offset)
500    {
501        ret = 0;
502        if(ALIGNED_SIZE(0, stream_size) >= aligned_end_offset)
503        {
504            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at end\n");
505            ret = dbpf_pread(
506                fd,
507                ((char *)aligned_buf) + aligned_size - BLOCK_SIZE,
508                BLOCK_SIZE,
509                aligned_end_offset - BLOCK_SIZE);
510            if(ret < 0)
511            {
512                int pread_errno = errno;
513                gossip_err(
514                    "direct_memcpy_write: RMW failed at end of request\n");
515                PINT_mem_aligned_free(aligned_buf);
516
517                return -trove_errno_to_trove_error(pread_errno);
518            }
519        }
520        else
521        {
522            memset(((char *)aligned_buf) + aligned_size - BLOCK_SIZE,
523                   0, BLOCK_SIZE);
524        }
525    }
526
527    /* now we're read to memcpy the actual (unaligned) request into the
528     * aligned buffer
529     */
530    memcpy(((char *)aligned_buf) + (write_offset - aligned_offset),
531           ((char *)buf) + buf_offset, size);
532
533    ret = direct_aligned_write(fd, aligned_buf, 0,
534                                aligned_size, aligned_offset, stream_size);
535
536    PINT_mem_aligned_free(aligned_buf);
537
538    return (ret < 0) ? ret : size;
539}
540
541/**
542 * Perform a read in direct mode (no buffering).
543 *
544 * @param fd - The file descriptor of the bstream to do the read from.  The
545 * file descriptor is required to be opened with O_DIRECT.  In debug mode,
546 * the O_DIRECT option is checked, and if it doesn't exist on the open file
547 * descriptor, EINVAL is returned.
548 *
549 * @param buf - The buffer to read data into.  This function assumes that
550 * the buffer has been allocated with the correct alignment (i.e. to a block
551 * size of 512, using posix_memalign or such).
552 *
553 * @param buf_offset - The offset into the buffer that data is read.
554 *
555 * @param buf_size - The available size of the buffer
556 *
557 * @param file_offset - offset into the file to start the read
558 *
559 * @param request_size - number of bytes to read from the file
560 *
561 * @param stream_size - size of the file
562 *
563 * @return number of bytes read
564 */
565static size_t direct_aligned_read(int fd,
566                                   void * buf,
567                                   off_t buf_offset,
568                                   size_t size,
569                                   off_t file_offset,
570                                   off_t stream_size)
571{
572    int ret;
573
574    if(file_offset >= stream_size)
575    {
576        /* the offset is past EOF, return 0 bytes read */
577        return 0;
578    }
579
580#ifndef NDEBUG
581    /* if debug is enabled, check that fd was opened with O_DIRECT */
582#ifdef HAVE_OPEN_O_DIRECT
583    if(!(fcntl(fd, F_GETFL) & O_DIRECT))
584#elif HAVE_FNCTL_F_NOCACHE
585    if (!(fcntl(fd, F_GETFL) & F_NOCACHE))
586#else
587    if (0) //TODO: error? fall back to madvise?
588#endif
589    {
590        gossip_err("dbpf_direct_read: trying to do direct IO but file wasn't "
591                   "opened with O_DIRECT\n");
592        return -EINVAL;
593    }
594#endif
595
596    /* verify that stuff is aligned properly */
597    assert(IS_ALIGNED_PTR(buf));
598    assert(ALIGNED_OFFSET(buf_offset) == buf_offset);
599    assert(ALIGNED_SIZE(file_offset, size) == size);
600    assert(ALIGNED_OFFSET(file_offset) == file_offset);
601
602    ret = dbpf_pread(fd, (((char *)buf) + buf_offset), size, file_offset);
603    if(ret < 0)
604    {
605        gossip_err("dbpf_direct_read: failed to perform aligned read\n");
606        return -trove_errno_to_trove_error(errno);
607    }
608
609    return ret;
610}
611
612static size_t direct_locked_read(int fd,
613                           void * buf,
614                           off_t buf_offset,
615                           size_t size,
616                           off_t file_offset,
617                           off_t stream_size)
618{
619    int ret, read_ret;
620    struct flock readlock;
621
622    readlock.l_type = F_RDLCK;
623    readlock.l_whence = SEEK_SET;
624    readlock.l_start = (off_t)ALIGNED_OFFSET(file_offset);
625    readlock.l_len = (off_t)ALIGNED_SIZE(file_offset, size);
626    ret = fcntl(fd, F_SETLKW, &readlock);
627    if(ret < 0 && errno == EINTR)
628    {
629        return -trove_errno_to_trove_error(errno);
630    }
631    readlock.l_type = F_UNLCK;
632
633    read_ret = direct_read(fd, buf, buf_offset, size, file_offset, stream_size);
634
635    ret = fcntl(fd, F_SETLK, &readlock);
636    if(ret < 0)
637    {
638        return -trove_errno_to_trove_error(errno);
639    }
640
641    return read_ret;
642}
643
644static size_t direct_read(int fd,
645                           void * buf,
646                           off_t buf_offset,
647                           size_t size,
648                           off_t file_offset,
649                           off_t stream_size)
650{
651    void * aligned_buf;
652    off_t aligned_offset;
653    size_t aligned_size, read_size;
654    size_t ret;
655
656    if(file_offset > stream_size)
657    {
658        return 0;
659    }
660
661    read_size = size;
662    if(stream_size < (file_offset + size))
663    {
664        read_size = stream_size - file_offset;
665    }
666
667    aligned_offset = ALIGNED_OFFSET(file_offset);
668    aligned_size = ALIGNED_SIZE(file_offset, read_size);
669
670    if(IS_ALIGNED_PTR(buf) &&
671       ALIGNED_OFFSET(buf_offset) == buf_offset &&
672       aligned_size == read_size)
673    {
674        return direct_aligned_read(fd, buf, buf_offset, read_size,
675                                   file_offset, stream_size);
676    }
677
678    aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
679    if(!aligned_buf)
680    {
681        return -ENOMEM;
682    }
683
684    ret = direct_aligned_read(fd, aligned_buf, 0, aligned_size,
685                               aligned_offset, stream_size);
686    if(ret < 0)
687    {
688        PINT_mem_aligned_free(aligned_buf);
689
690        return ret;
691    }
692
693    memcpy(((char *)buf) + buf_offset,
694           ((char *)aligned_buf) + (file_offset - aligned_offset),
695           read_size);
696
697    PINT_mem_aligned_free(aligned_buf);
698
699    return ret;
700}
701
702static int dbpf_bstream_direct_read_op_svc(void *ptr, PVFS_hint hint)
703{
704    int ret = -TROVE_EINVAL;
705    TROVE_object_ref ref;
706    TROVE_ds_attributes attr;
707    dbpf_queued_op_t *qop_p;
708    struct dbpf_bstream_rw_list_op *rw_op;
709    dbpf_stream_extents_t *stream_extents = NULL;
710    int i, extent_count;
711
712    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
713    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
714
715    ref.fs_id = qop_p->op.coll_p->coll_id;
716    ref.handle = qop_p->op.handle;
717
718    /* not in attribute cache.  get the size from dspace */
719    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
720    if(ret != 0)
721    {
722        gossip_err("%s: failed to get size in dspace attr: (error=%d)\n",
723                   __func__, ret);
724        goto done;
725    }
726
727    ret = dbpf_bstream_get_extents(
728        rw_op->mem_offset_array,
729        rw_op->mem_size_array,
730        rw_op->mem_array_count,
731        rw_op->stream_offset_array,
732        rw_op->stream_size_array,
733        rw_op->stream_array_count,
734        &extent_count,
735        NULL);
736    if(ret != 0)
737    {
738        gossip_err("%s: failed to get bstream extents from offset/sizes: "
739                   "(error=%d)\n", __func__, ret);
740        goto done;
741    }
742
743    stream_extents = malloc(sizeof(*stream_extents) * extent_count);
744    if(!stream_extents)
745    {
746        return -TROVE_ENOMEM;
747    }
748
749    ret = dbpf_bstream_get_extents(
750        rw_op->mem_offset_array,
751        rw_op->mem_size_array,
752        rw_op->mem_array_count,
753        rw_op->stream_offset_array,
754        rw_op->stream_size_array,
755        rw_op->stream_array_count,
756        &extent_count,
757        stream_extents);
758    if(ret != 0)
759    {
760        gossip_err("%s: failed to get bstream extents from offset/sizes: "
761                   "(error=%d)\n", __func__, ret);
762        goto done;
763    }
764
765    for(i = 0; i < extent_count; ++ i)
766    {
767        ret = direct_locked_read(rw_op->open_ref.fd,
768                          stream_extents[i].buffer,
769                          0,
770                          stream_extents[i].size,
771                          stream_extents[i].offset,
772                          attr.u.datafile.b_size);
773        if(ret < 0)
774        {
775            ret = -trove_errno_to_trove_error(-ret);
776            gossip_err("%s: direct_locked_read failed: (error=%d)\n", __func__,
777                        ret);
778            goto done;
779        }
780    }
781
782    ret = DBPF_OP_COMPLETE;
783
784done:
785    if(stream_extents)
786    {
787        free(stream_extents);
788    }
789    dbpf_open_cache_put(&rw_op->open_ref);
790    return ret;
791}
792
793static int dbpf_bstream_direct_write_op_svc(void *ptr, PVFS_hint hint)
794{
795    int ret = -TROVE_EINVAL;
796    TROVE_object_ref ref;
797    TROVE_ds_attributes attr;
798    dbpf_stream_extents_t *stream_extents = NULL;
799    int i, extent_count;
800    struct dbpf_bstream_rw_list_op *rw_op;
801    dbpf_queued_op_t *qop_p;
802    PVFS_size eor = -1;
803    int sync_required = 0;
804
805    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
806    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
807
808    ref.fs_id = qop_p->op.coll_p->coll_id;
809    ref.handle = qop_p->op.handle;
810
811    ret = dbpf_bstream_get_extents(
812        rw_op->mem_offset_array,
813        rw_op->mem_size_array,
814        rw_op->mem_array_count,
815        rw_op->stream_offset_array,
816        rw_op->stream_size_array,
817        rw_op->stream_array_count,
818        &extent_count,
819        NULL);
820    if(ret != 0)
821    {
822        gossip_err("%s: failed to count extents from stream offset/sizes: "
823                   "(error=%d)\n", __func__, ret);
824        goto cache_put;
825    }
826
827    stream_extents = malloc(sizeof(*stream_extents) * extent_count);
828    if(!stream_extents)
829    {
830        ret = -TROVE_ENOMEM;
831        goto cache_put;
832    }
833
834    ret = dbpf_bstream_get_extents(
835        rw_op->mem_offset_array,
836        rw_op->mem_size_array,
837        rw_op->mem_array_count,
838        rw_op->stream_offset_array,
839        rw_op->stream_size_array,
840        rw_op->stream_array_count,
841        &extent_count,
842        stream_extents);
843    if(ret != 0)
844    {
845        gossip_err("%s: failed to get stream extents from stream offset/sizes: "
846                   "(error=%d)\n", __func__, ret);
847        goto cache_put;
848    }
849
850    if( grow_bstream_table == NULL )
851    {
852        ret = grow_bstream_handle_table_init( 1021 );
853        if( ret != 0 )
854        {
855            gossip_err("%s: failed to create grow_bstream_handle_table\n",
856                       __func__);
857            goto cache_put;
858        }
859    }
860
861    /* acquire a lock on this handle prior to getting the size to prevent
862     * a race condition between multiple writes getting the wrong size */
863    grow_bstream_handle_acquire_lock( ref );
864
865    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
866    if(ret != 0)
867    {
868        gossip_err("%s: failed to get dspace attr for bstream: (error=%d)\n",
869                    __func__, ret);
870        grow_bstream_handle_release_lock( ref );
871        goto cache_put;
872    }
873
874    /* prior to writes see if we are growing the file, if not, release the
875     * lock for growing file size since we won't be updating the file size
876     * below */
877    for(i = 0; i < extent_count; ++ i)
878    {
879        if(eor < stream_extents[i].offset + stream_extents[i].size)
880        {
881            eor = stream_extents[i].offset + stream_extents[i].size;
882        }
883    }
884    if(eor <= attr.u.datafile.b_size)
885    {
886        /* file size is not growing so we do not need to hold the lock
887         * since we won't update the size attribute below */
888        grow_bstream_handle_release_lock( ref );
889    }
890   
891    *rw_op->out_size_p = 0;
892
893    for(i = 0; i < extent_count; ++ i)
894    {
895        ret = direct_locked_write(rw_op->open_ref.fd,
896                                  stream_extents[i].buffer,
897                                  0,
898                                  stream_extents[i].size,
899                                  stream_extents[i].offset,
900                                  attr.u.datafile.b_size);
901        if(ret < 0)
902        {
903            gossip_err("%s: failed to perform direct locked write: "
904                       "(error=%d)\n", __func__, ret);
905            if(eor > attr.u.datafile.b_size)
906            {
907                grow_bstream_handle_release_lock( ref );
908            }
909            goto cache_put;
910        }
911        /* did this calculation above
912         * if(eor < stream_extents[i].offset + stream_extents[i].size)
913         * {
914         *    eor = stream_extents[i].offset + stream_extents[i].size;
915         * }
916         */
917        *rw_op->out_size_p += ret;
918    }
919
920    if(eor > attr.u.datafile.b_size)
921    {
922        int outcount;
923
924        gen_mutex_lock(&dbpf_update_size_lock);
925        ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
926        if(ret != 0)
927        {
928            gossip_err("%s: failed to get size from dspace attr: (error=%d)\n",
929                       __func__, ret);
930            gen_mutex_unlock(&dbpf_update_size_lock);
931            grow_bstream_handle_release_lock( ref );
932            goto cache_put;
933        }
934
935        if(eor > attr.u.datafile.b_size)
936        {
937            /* set the size of the file */
938            attr.u.datafile.b_size = eor;
939            ret = dbpf_dspace_attr_set(qop_p->op.coll_p, ref, &attr);
940            if(ret != 0)
941            {
942                gossip_err("%s: failed to update size in dspace attr: "
943                           "(error=%d)\n", __func__, ret);
944                gen_mutex_unlock(&dbpf_update_size_lock);
945                grow_bstream_handle_release_lock( ref );
946                goto cache_put;
947            }
948            sync_required = 1;
949        }
950        gen_mutex_unlock(&dbpf_update_size_lock);
951
952        if(sync_required == 1)
953        {
954            gossip_debug(GOSSIP_DIRECTIO_DEBUG,
955                "directio updating size for handle %llu\n", llu(ref.handle));
956
957            dbpf_open_cache_put(&rw_op->open_ref);
958
959            /* If we updated the size, then convert cur_op into a setattr.
960             * Note that we are not actually going to perform a setattr.
961             * We just want the coalescing path to treat it like a setattr
962             * so that the size update is synced before we complete.
963             */
964            dbpf_queued_op_init(qop_p,
965                                DSPACE_SETATTR,
966                                ref.handle,
967                                qop_p->op.coll_p,
968                                dbpf_dspace_setattr_op_svc,
969                                qop_p->op.user_ptr,
970                                TROVE_SYNC,
971                                qop_p->op.context_id);
972            qop_p->op.state = OP_IN_SERVICE;
973            ret = dbpf_sync_coalesce(qop_p, 0, &outcount);
974            if(ret < 0)
975            {
976                gossip_err("%s: failed to coalesce size update in dspace "
977                           "attr: (error=%d)\n", __func__, ret);
978                grow_bstream_handle_release_lock( ref );
979                goto done;
980            }
981
982            ret = grow_bstream_handle_release_lock( ref );
983            if( ret != 0 )
984            {
985                gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: failed to release "
986                             "grow_bstream_handle lock when not updating "
987                             "file size\n", __func__ );
988            }
989
990            ret = PINT_MGMT_OP_CONTINUE;
991            goto done;
992        }
993        else
994        {
995            /* still need to release the lock even thought we didn't update
996             * the size because the size calc prior to doing the writes told
997             * use we would */
998            ret = grow_bstream_handle_release_lock( ref );
999            if( ret != 0 )
1000            {
1001                gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: failed to release "
1002                             "grow_bstream_handle lock when not updating "
1003                             "file size\n", __func__ );
1004            }
1005        }
1006    }
1007    /* if we don't try to update the size then we already released the
1008     * handle grow lock above */
1009
1010   ret = PINT_MGMT_OP_COMPLETED;
1011
1012cache_put:
1013    dbpf_open_cache_put(&rw_op->open_ref);
1014done:
1015    if(stream_extents)
1016    {
1017        free(stream_extents);
1018    }
1019    return ret;
1020}
1021
1022static int dbpf_bstream_direct_read_at(TROVE_coll_id coll_id,
1023                                       TROVE_handle handle,
1024                                       void *buffer,
1025                                       TROVE_size *inout_size_p,
1026                                       TROVE_offset offset,
1027                                       TROVE_ds_flags flags,
1028                                       TROVE_vtag_s *vtag,
1029                                       void *user_ptr,
1030                                       TROVE_context_id context_id,
1031                                       TROVE_op_id *out_op_id_p,
1032                                       PVFS_hint hints)
1033{
1034    return -TROVE_ENOSYS;
1035}
1036
1037static int dbpf_bstream_direct_write_at(TROVE_coll_id coll_id,
1038                                        TROVE_handle handle,
1039                                        void *buffer,
1040                                        TROVE_size *inout_size_p,
1041                                        TROVE_offset offset,
1042                                        TROVE_ds_flags flags,
1043                                        TROVE_vtag_s *vtag,
1044                                        void *user_ptr,
1045                                        TROVE_context_id context_id,
1046                                        TROVE_op_id *out_op_id_p,
1047                                        PVFS_hint hints)
1048{
1049    return -TROVE_ENOSYS;
1050}
1051
1052static int dbpf_bstream_direct_read_list(TROVE_coll_id coll_id,
1053                                         TROVE_handle handle,
1054                                         char **mem_offset_array,
1055                                         TROVE_size *mem_size_array,
1056                                         int mem_count,
1057                                         TROVE_offset *stream_offset_array,
1058                                         TROVE_size *stream_size_array,
1059                                         int stream_count,
1060                                         TROVE_size *out_size_p,
1061                                         TROVE_ds_flags flags,
1062                                         TROVE_vtag_s *vtag,
1063                                         void *user_ptr,
1064                                         TROVE_context_id context_id,
1065                                         TROVE_op_id *out_op_id_p,
1066                                         PVFS_hint hints)
1067{
1068
1069    dbpf_queued_op_t *q_op_p = NULL;
1070    struct dbpf_bstream_rw_list_op *op;
1071    struct dbpf_collection *coll_p = NULL;
1072    int ret;
1073
1074    coll_p = dbpf_collection_find_registered(coll_id);
1075    if (coll_p == NULL)
1076    {
1077        gossip_err("%s: failed to find collection with fsid %d\n",
1078                   __func__, coll_id);
1079        return -TROVE_EINVAL;
1080    }
1081
1082    q_op_p = dbpf_queued_op_alloc();
1083    if (q_op_p == NULL)
1084    {
1085        return -TROVE_ENOMEM;
1086    }
1087
1088    /* initialize all the common members */
1089    dbpf_queued_op_init(q_op_p,
1090                        BSTREAM_READ_LIST,
1091                        handle,
1092                        coll_p,
1093                        NULL,
1094                        user_ptr,
1095                        flags,
1096                        context_id);
1097    op = (struct dbpf_bstream_rw_list_op *)&q_op_p->op.u.b_rw_list;
1098
1099    /* initialize the op-specific members */
1100    op->stream_array_count = stream_count;
1101    op->stream_offset_array = stream_offset_array;
1102    op->stream_size_array = stream_size_array;
1103    op->out_size_p = out_size_p;
1104
1105    op->mem_array_count = mem_count;
1106    op->mem_offset_array = mem_offset_array;
1107    op->mem_size_array = mem_size_array;
1108    op->queued_op_ptr = q_op_p;
1109
1110    ret = dbpf_open_cache_get(
1111        coll_id, handle,
1112        DBPF_FD_DIRECT_READ,
1113        &op->open_ref);
1114    if(ret < 0)
1115    {
1116        if(ret == -TROVE_ENOENT)
1117        {
1118            /* We create the bstream lazily, so here we'll just assume the read
1119             * was done before writes to this bstream occured, and return
1120             * a successful read of size 0.
1121             */
1122            *out_size_p = 0;
1123            ret = DBPF_OP_COMPLETE;
1124        }
1125        dbpf_queued_op_free(q_op_p);
1126        return ret;
1127    }
1128
1129    *out_op_id_p = q_op_p->op.id;
1130    ret = PINT_manager_id_post(
1131        io_thread_mgr, q_op_p, &q_op_p->mgr_op_id,
1132        dbpf_bstream_direct_read_op_svc, op, NULL, io_queue_id);
1133    if(ret < 0)
1134    {
1135        gossip_err("%s: failed to post direct read op: (error=%d)\n",
1136                   __func__, ret);
1137        return ret;
1138    }
1139
1140    return DBPF_OP_CONTINUE;
1141}
1142
1143static int dbpf_bstream_direct_write_list(TROVE_coll_id coll_id,
1144                                          TROVE_handle handle,
1145                                          char **mem_offset_array,
1146                                          TROVE_size *mem_size_array,
1147                                          int mem_count,
1148                                          TROVE_offset *stream_offset_array,
1149                                          TROVE_size *stream_size_array,
1150                                          int stream_count,
1151                                          TROVE_size *out_size_p,
1152                                          TROVE_ds_flags flags,
1153                                          TROVE_vtag_s *vtag,
1154                                          void *user_ptr,
1155                                          TROVE_context_id context_id,
1156                                          TROVE_op_id *out_op_id_p,
1157                                          PVFS_hint hints)
1158{
1159
1160    dbpf_queued_op_t *q_op_p = NULL;
1161    struct dbpf_bstream_rw_list_op *op;
1162    struct dbpf_collection *coll_p = NULL;
1163    int ret;
1164
1165    coll_p = dbpf_collection_find_registered(coll_id);
1166    if (coll_p == NULL)
1167    {
1168        return -TROVE_EINVAL;
1169    }
1170
1171    q_op_p = dbpf_queued_op_alloc();
1172    if(!q_op_p)
1173    {
1174        return -TROVE_ENOMEM;
1175    }
1176    dbpf_queued_op_init(q_op_p,
1177                        BSTREAM_WRITE_LIST,
1178                        handle,
1179                        coll_p,
1180                        NULL,
1181                        user_ptr,
1182                        TROVE_SYNC,
1183                        context_id);
1184
1185    op = &q_op_p->op.u.b_rw_list;
1186
1187    /* initialize the op-specific members */
1188    op->stream_array_count = stream_count;
1189    op->stream_offset_array = stream_offset_array;
1190    op->stream_size_array = stream_size_array;
1191    op->out_size_p = out_size_p;
1192
1193    op->mem_array_count = mem_count;
1194    op->mem_offset_array = mem_offset_array;
1195    op->mem_size_array = mem_size_array;
1196    op->queued_op_ptr = q_op_p;
1197
1198    ret = dbpf_open_cache_get(
1199        coll_id, handle,
1200        DBPF_FD_DIRECT_WRITE,
1201        &op->open_ref);
1202    if(ret < 0)
1203    {
1204        dbpf_queued_op_free(q_op_p);
1205        return ret;
1206    }
1207
1208    *out_op_id_p = q_op_p->op.id;
1209
1210    gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: queuing direct write operation\n",
1211                  __func__);
1212    PINT_manager_id_post(
1213        io_thread_mgr, q_op_p, &q_op_p->mgr_op_id,
1214        dbpf_bstream_direct_write_op_svc, op, NULL, io_queue_id);
1215
1216    return DBPF_OP_CONTINUE;
1217}
1218
1219static int dbpf_bstream_direct_resize_op_svc(struct dbpf_op *op_p)
1220{
1221    int ret;
1222    TROVE_ds_attributes attr;
1223    TROVE_object_ref ref;
1224    dbpf_queued_op_t *q_op_p;
1225    struct open_cache_ref open_ref;
1226    PVFS_size tmpsize;
1227
1228    q_op_p = (dbpf_queued_op_t *)op_p->u.b_resize.queued_op_ptr;
1229    ref.fs_id = op_p->coll_p->coll_id;
1230    ref.handle = op_p->handle;
1231
1232    gen_mutex_lock(&dbpf_update_size_lock);
1233    ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
1234    if(ret != 0)
1235    {
1236        gen_mutex_unlock(&dbpf_update_size_lock);
1237        return ret;
1238    }
1239
1240    tmpsize = op_p->u.b_resize.size;
1241    attr.u.datafile.b_size = tmpsize;
1242
1243    ret = dbpf_dspace_attr_set(op_p->coll_p, ref, &attr);
1244    if(ret < 0)
1245    {
1246        gen_mutex_unlock(&dbpf_update_size_lock);
1247        return ret;
1248    }
1249    gen_mutex_unlock(&dbpf_update_size_lock);
1250
1251    /* setup op for sync coalescing */
1252    dbpf_queued_op_init(q_op_p,
1253                        DSPACE_SETATTR,
1254                        ref.handle,
1255                        q_op_p->op.coll_p,
1256                        dbpf_dspace_setattr_op_svc,
1257                        q_op_p->op.user_ptr,
1258                        TROVE_SYNC,
1259                        q_op_p->op.context_id);
1260    q_op_p->op.state = OP_IN_SERVICE;
1261
1262    /* truncate file after attributes are set */
1263    ret = dbpf_open_cache_get(
1264        op_p->coll_p->coll_id, op_p->handle,
1265        DBPF_FD_DIRECT_WRITE,
1266        &open_ref);
1267    if(ret < 0)
1268    {
1269        return ret;
1270    }
1271
1272    ret = DBPF_RESIZE(open_ref.fd, tmpsize);
1273    if(ret < 0)
1274    {
1275        return(ret);
1276    }
1277
1278    dbpf_open_cache_put(&open_ref);
1279
1280    return DBPF_OP_COMPLETE;
1281}
1282
1283static int dbpf_bstream_direct_resize(TROVE_coll_id coll_id,
1284                                      TROVE_handle handle,
1285                                      TROVE_size *inout_size_p,
1286                                      TROVE_ds_flags flags,
1287                                      TROVE_vtag_s *vtag,
1288                                      void *user_ptr,
1289                                      TROVE_context_id context_id,
1290                                      TROVE_op_id *out_op_id_p,
1291                                      PVFS_hint hints)
1292{
1293    dbpf_queued_op_t *q_op_p = NULL;
1294    struct dbpf_collection *coll_p = NULL;
1295
1296    coll_p = dbpf_collection_find_registered(coll_id);
1297    if (coll_p == NULL)
1298    {
1299        return -TROVE_EINVAL;
1300    }
1301
1302    q_op_p = dbpf_queued_op_alloc();
1303    if (q_op_p == NULL)
1304    {
1305        return -TROVE_ENOMEM;
1306    }
1307
1308    /* initialize all the common members */
1309    dbpf_queued_op_init(q_op_p,
1310                        BSTREAM_RESIZE,
1311                        handle,
1312                        coll_p,
1313                        dbpf_bstream_direct_resize_op_svc,
1314                        user_ptr,
1315                        flags,
1316                        context_id);
1317
1318    /* initialize the op-specific members */
1319    q_op_p->op.u.b_resize.size = *inout_size_p;
1320    q_op_p->op.u.b_resize.queued_op_ptr = q_op_p;
1321    *out_op_id_p = dbpf_queued_op_queue(q_op_p);
1322
1323    return 0;
1324}
1325
1326static int dbpf_bstream_direct_validate(TROVE_coll_id coll_id,
1327                                        TROVE_handle handle,
1328                                        TROVE_ds_flags flags,
1329                                        TROVE_vtag_s *vtag,
1330                                        void *user_ptr,
1331                                        TROVE_context_id context_id,
1332                                        TROVE_op_id *out_op_id_p,
1333                                        PVFS_hint hints)
1334{
1335    return -TROVE_ENOSYS;
1336}
1337
1338static int dbpf_bstream_direct_flush(TROVE_coll_id coll_id,
1339                                     TROVE_handle handle,
1340                                     TROVE_ds_flags flags,
1341                                     void *user_ptr,
1342                                     TROVE_context_id context_id,
1343                                     TROVE_op_id *out_op_id_p,
1344                                     PVFS_hint hints)
1345{
1346    return DBPF_OP_COMPLETE;
1347}
1348
1349static int dbpf_bstream_direct_cancel(
1350    TROVE_coll_id coll_id,
1351    TROVE_op_id cancel_id,
1352    TROVE_context_id context_id)
1353{
1354    dbpf_queued_op_t *op;
1355    int ret;
1356
1357    op = id_gen_fast_lookup(cancel_id);
1358    if(!op)
1359    {
1360        gossip_lerr("Invalid op-id to cancel\n");
1361        return -TROVE_EINVAL;
1362    }
1363
1364    ret = PINT_manager_cancel(io_thread_mgr, op->mgr_op_id);
1365    if(ret < 0)
1366    {
1367        return ret|PVFS_ERROR_TROVE;
1368    }
1369
1370    return ret;
1371}
1372
1373struct TROVE_bstream_ops dbpf_bstream_direct_ops =
1374{
1375    dbpf_bstream_direct_read_at,
1376    dbpf_bstream_direct_write_at,
1377    dbpf_bstream_direct_resize,
1378    dbpf_bstream_direct_validate,
1379    dbpf_bstream_direct_read_list,
1380    dbpf_bstream_direct_write_list,
1381    dbpf_bstream_direct_flush,
1382    dbpf_bstream_direct_cancel
1383};
1384
1385static int dbpf_bstream_get_extents(
1386    char **mem_offset_array,
1387    TROVE_size *mem_size_array,
1388    int mem_count,
1389    TROVE_offset *stream_offset_array,
1390    TROVE_size *stream_size_array,
1391    int stream_count,
1392    int *ext_count,
1393    dbpf_stream_extents_t *extents)
1394{
1395    int mct = 0, sct = 0, act = 0;
1396    int oom = 0, oos = 0;
1397    TROVE_size cur_mem_size = 0;
1398    char *cur_mem_off = NULL;
1399    char *ext_ptr = NULL;
1400    TROVE_size ext_size = 0, cur_stream_size = 0;
1401    TROVE_offset ext_off = 0, cur_stream_off = 0;
1402
1403    cur_mem_size = mem_size_array[mct];
1404    cur_mem_off = mem_offset_array[mct];
1405
1406    cur_stream_size = stream_size_array[sct];
1407    cur_stream_off = stream_offset_array[sct];
1408
1409    while (1)
1410    {
1411        /*
1412          determine if we're either out of memory (oom) regions, or
1413          out of stream (oos) regions
1414        */
1415        /* in many (all?) cases mem_count is 1, so oom will end up being 1 */
1416        oom = (((mct + 1) < mem_count) ? 0 : 1);
1417        oos = (((sct + 1) < stream_count) ? 0 : 1);
1418
1419        if (cur_mem_size == cur_stream_size)
1420        {
1421            /* consume both mem and stream regions */
1422            ext_size = cur_mem_size;
1423            ext_ptr = cur_mem_off;
1424            ext_off = cur_stream_off;
1425
1426            if (!oom)
1427            {
1428                cur_mem_size = mem_size_array[++mct];
1429                cur_mem_off  = mem_offset_array[mct];
1430            }
1431            else
1432            {
1433                cur_mem_size = 0;
1434            }
1435            if (!oos)
1436            {
1437                cur_stream_size = stream_size_array[++sct];
1438                cur_stream_off  = stream_offset_array[sct];
1439            }
1440            else
1441            {
1442                cur_stream_size = 0;
1443            }
1444        }
1445        else if (cur_mem_size < cur_stream_size)
1446        {
1447            /* consume mem region and update stream region */
1448            ext_size = cur_mem_size;
1449            ext_ptr = cur_mem_off;
1450            ext_off = cur_stream_off;
1451
1452            cur_stream_size -= cur_mem_size;
1453            cur_stream_off  += cur_mem_size;
1454
1455            if (!oom)
1456            {
1457                cur_mem_size = mem_size_array[++mct];
1458                cur_mem_off  = mem_offset_array[mct];
1459            }
1460            else
1461            {
1462                cur_mem_size = 0;
1463            }
1464        }
1465        else /* cur_mem_size > cur_stream_size */
1466        {
1467            /* consume stream region and update mem region */
1468            ext_size = cur_stream_size;
1469            ext_ptr = cur_mem_off;
1470            ext_off = cur_stream_off;
1471
1472            cur_mem_size -= cur_stream_size;
1473            cur_mem_off  += cur_stream_size;
1474
1475            if (!oos)
1476            {
1477                cur_stream_size = stream_size_array[++sct];
1478                cur_stream_off  = stream_offset_array[sct];
1479            }
1480            else
1481            {
1482                cur_stream_size = 0;
1483            }
1484        }
1485
1486        if(extents)
1487        {
1488            extents[act].buffer = ext_ptr;
1489            extents[act].offset = ext_off;
1490            extents[act].size =   ext_size;
1491        }
1492        act++;
1493
1494        /* process until there are no bytes left in the current piece */
1495        if ((oom && cur_mem_size == 0) || (oos && cur_stream_size == 0))
1496        {
1497            break;
1498        }
1499    }
1500
1501    /* return the number actually used */
1502    *ext_count = act;
1503    return 0;
1504}
1505
1506/* grow_bstream_handle_table_init()
1507 *
1508 * initialize the grow_bstream_table
1509 *
1510 * size: prime number of hash table size
1511 */
1512static int grow_bstream_handle_table_init( int size )
1513{
1514    gen_mutex_lock( &grow_bstream_table_lock );
1515    if( grow_bstream_table == NULL )
1516    {
1517        grow_bstream_table = qhash_init(hash_handle_compare, hash_handle, size);
1518        if( grow_bstream_table == NULL )
1519        {
1520            return -PVFS_ENOMEM;
1521        }
1522    }
1523    gen_mutex_unlock( &grow_bstream_table_lock );
1524    return 0;
1525}
1526
1527/* obtains a per-handle lock by locking an existing entry in the
1528 * grow_bstream_table or creating an entry and grabbing the lock.
1529 *
1530 * returns 0 on success and the handle lock held or an error
1531 */
1532static int grow_bstream_handle_acquire_lock( TROVE_object_ref ref )
1533{
1534    struct qlist_head *hash_link = NULL;
1535    struct grow_bstream_handle *grow_handle = NULL;
1536
1537    gen_mutex_lock( &grow_bstream_table_lock );
1538    if( grow_bstream_table == NULL )
1539    {
1540        gen_mutex_unlock( &grow_bstream_table_lock );
1541        return -PVFS_EINVAL;
1542    }
1543
1544    hash_link = qhash_search(grow_bstream_table, &(ref.handle) );
1545    if( hash_link )
1546    {
1547        grow_handle = qlist_entry( hash_link, struct grow_bstream_handle,
1548                                  hash_link);
1549    }
1550    else
1551    {
1552        grow_handle = calloc( 1, sizeof( struct grow_bstream_handle ));
1553        if( grow_handle == NULL )
1554        {
1555            gen_mutex_unlock( &grow_bstream_table_lock );
1556            gossip_err( "%s: failed to alloc memory\n", __func__);
1557            return -PVFS_ENOMEM;
1558        }
1559        grow_handle->handle = ref.handle;
1560        gen_mutex_init( &(grow_handle->handle_lock) );
1561        gen_mutex_init( &(grow_handle->refcount_lock) );
1562
1563        /* we're safe adding it and waiting on grabbing the lock because
1564         * we still have the lock on the table so no one else
1565         * should access this new member */
1566        qhash_add( grow_bstream_table, &(grow_handle->handle),
1567                   &(grow_handle->hash_link) );
1568    }
1569
1570    /* increment the number of things using the hash member */
1571    gen_mutex_lock( &(grow_handle->refcount_lock) );
1572    grow_handle->refcount++;
1573    gen_mutex_unlock( &(grow_handle->refcount_lock) );
1574
1575    gen_mutex_unlock( &grow_bstream_table_lock );
1576
1577    gen_mutex_lock( &(grow_handle->handle_lock));
1578
1579    return 0;
1580}
1581
1582static int grow_bstream_handle_release_lock( TROVE_object_ref ref )
1583{
1584    struct qlist_head *hash_link = NULL;
1585    struct grow_bstream_handle *grow_handle = NULL;
1586    int rcount = 0;
1587
1588    gen_mutex_lock( &grow_bstream_table_lock );
1589    if( grow_bstream_table == NULL )
1590    {
1591        gen_mutex_unlock( &grow_bstream_table_lock );
1592        return -PVFS_EINVAL;
1593    }
1594
1595    hash_link = qhash_search(grow_bstream_table, &(ref.handle) );
1596    if( hash_link )
1597    {
1598        grow_handle = qlist_entry(hash_link, struct grow_bstream_handle,
1599                                  hash_link);
1600
1601        gen_mutex_lock( &(grow_handle->refcount_lock) );
1602        rcount = --grow_handle->refcount;
1603        gen_mutex_unlock( &(grow_handle->refcount_lock) );
1604
1605        /* if we're the last reference remove it from the hash and free the
1606         * memory. may want to optimize this so we aren't continuously
1607         * alloc/free for each read */
1608        if( rcount == 0 )
1609        {
1610            gen_mutex_unlock( &(grow_handle->handle_lock));
1611            qhash_del(hash_link);
1612            gen_mutex_destroy( &(grow_handle->handle_lock) );
1613            gen_mutex_destroy( &(grow_handle->refcount_lock) );
1614            free( grow_handle );
1615        }
1616        else
1617        {
1618            gen_mutex_unlock( &(grow_handle->handle_lock));
1619        }
1620    }
1621    else
1622    {
1623       /* should have an entry, but if not just report it for debugging */
1624       gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: no grow_handle entry when "
1625                    "trying to remove with refcount %d\n", __func__, rcount);
1626    }
1627    gen_mutex_unlock( &grow_bstream_table_lock );
1628    return 0;
1629}
1630
1631/* hash_handle()
1632 *
1633 * hash function for handles added to table
1634 * taken from src/server/request-scheduler/request-scheduler.c
1635 *
1636 * returns integer offset into table
1637 */
1638static int hash_handle(
1639    void *handle,
1640    int table_size)
1641{
1642    /* TODO: update this later with a better hash function,
1643     * depending on what handles look like, for now just modding
1644     *
1645     */
1646    unsigned long tmp = 0;
1647    PVFS_handle *real_handle = handle;
1648
1649    tmp += (*(real_handle));
1650    tmp = tmp % table_size;
1651
1652    return ((int) tmp);
1653}
1654
1655/* hash_handle_compare()
1656 *
1657 * performs a comparison of a hash table entro to a given key
1658 * (used for searching)
1659 * taken from src/server/request-scheduler/request-scheduler.c
1660 *
1661 * returns 1 if match found, 0 otherwise
1662 */
1663static int hash_handle_compare(
1664    void *key,
1665    struct qlist_head *link)
1666{
1667    struct grow_bstream_handle *my_handle;
1668    PVFS_handle *real_handle = key;
1669
1670    my_handle = qlist_entry(link, struct grow_bstream_handle, hash_link);
1671    if (my_handle->handle == *real_handle)
1672    {
1673        return (1);
1674    }
1675
1676    return (0);
1677}
1678
1679#if 0
1680int dbpf_aligned_blocks_init(void)
1681{
1682    int i;
1683
1684    aligned_blocks_buffer = PINT_mem_aligned_alloc(BLOCK_SIZE*256, BLOCK_SIZE);
1685    blocks = malloc(sizeof(*blocks) * 256);
1686    used_count = 0;
1687    gen_mutex_lock(&aligned_blocks_mutex);
1688    for(i = 0; i < 256; ++i)
1689    {
1690        blocks[i].ptr = ((char *)aligned_blocks_buffer) + (i*BLOCK_SIZE);
1691        qlist_add_tail(&(blocks[i].link), &aligned_blocks_unused);
1692    }
1693    gen_mutex_unlock(&aligned_blocks_mutex);
1694    return 0;
1695}
1696
1697int dbpf_aligned_blocks_finalize(void)
1698{
1699    free(blocks);
1700    PINT_mem_aligned_free(aligned_blocks_buffer);
1701    return 0;
1702}
1703
1704void *dbpf_aligned_block_get(void)
1705{
1706    void *ptr;
1707    struct aligned_block *ablock;
1708    gen_mutex_lock(&aligned_blocks_mutex);
1709    if(used_count > 255)
1710    {
1711        gossip_debug(GOSSIP_DIRECTIO_DEBUG, "ran out of aligned blocks: %d\n",
1712                     used_count);
1713        gen_mutex_unlock(&aligned_blocks_mutex);
1714        return NULL;
1715    }
1716    if(qlist_empty(&aligned_blocks_unused))
1717    {
1718        gossip_debug(GOSSIP_DIRECTIO_DEBUG,
1719                     "aligned_block_get: unused list empty.\n");
1720        gen_mutex_unlock(&aligned_blocks_mutex);
1721        return NULL;
1722    }
1723
1724    ablock = qlist_entry(aligned_blocks_unused.next, struct aligned_block,
1725                         link);
1726    qlist_del(&ablock->link);
1727    ptr = ablock->ptr;
1728    ablock->ptr = NULL;
1729    qlist_add_tail(&ablock->link, &aligned_blocks_used);
1730    ++used_count;
1731    gen_mutex_unlock(&aligned_blocks_mutex);
1732    return ptr;
1733}
1734
1735int dbpf_aligned_block_put(void *ptr)
1736{
1737    struct aligned_block *ablock;
1738
1739    gen_mutex_lock(&aligned_blocks_mutex);
1740    ablock = qlist_entry(aligned_blocks_used.next, struct aligned_block, link);
1741    qlist_del(&ablock->link);
1742    ablock->ptr = ptr;
1743    qlist_add_tail((&(ablock->link)), &aligned_blocks_unused);
1744    --used_count;
1745    gen_mutex_unlock(&aligned_blocks_mutex);
1746    return 0;
1747}
1748#endif
1749
1750/*
1751 * Local variables:
1752 *  c-indent-level: 4
1753 *  c-basic-offset: 4
1754 * End:
1755 *
1756 * vim: ts=8 sts=4 sw=4 expandtab
1757 */
Note: See TracBrowser for help on using the browser.