root/branches/pvfs-2-8-branch/src/io/trove/trove-dbpf/dbpf-bstream-direct.c @ 8402

Revision 8402, 43.2 KB (checked in by pcarns, 3 years ago)

merge patch submitted by Bart Taylor to correct an overflow in directio
method on 32 bit machines

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include <stdlib.h>
8#include <unistd.h>
9#include <stdio.h>
10#include <sys/types.h>
11#include <sys/stat.h>
12#include <fcntl.h>
13#include <stdlib.h>
14#ifdef HAVE_MALLOC_H
15#include <malloc.h>
16#endif
17#include <assert.h>
18#include <errno.h>
19#include <string.h>
20
21#include "gossip.h"
22#include "pvfs2-debug.h"
23#include "trove.h"
24#include "trove-internal.h"
25#include "dbpf.h"
26#include "dbpf-op.h"
27#include "dbpf-op-queue.h"
28#include "dbpf-attr-cache.h"
29#include "dbpf-bstream.h"
30#include "dbpf-sync.h"
31#include "pint-mem.h"
32#include "pint-mgmt.h"
33#include "pint-context.h"
34#include "pint-op.h"
35
36static gen_mutex_t dbpf_update_size_lock = GEN_MUTEX_INITIALIZER;
37
38typedef struct
39{
40    char *buffer;
41    TROVE_size size;
42    TROVE_offset offset;
43} dbpf_stream_extents_t;
44
45static int dbpf_bstream_get_extents(
46    char **mem_offset_array,
47    TROVE_size *mem_size_array,
48    int mem_count,
49    TROVE_offset *stream_offset_array,
50    TROVE_size *stream_size_array,
51    int stream_count,
52    int *ext_count,
53    dbpf_stream_extents_t *extents);
54
55static size_t direct_aligned_write(int fd,
56                                    void *buf,
57                                    off_t buf_offset,
58                                    size_t size,
59                                    off_t write_offset,
60                                    off_t stream_size);
61
62static size_t direct_locked_write(int fd,
63                            void * buf,
64                            off_t buf_offset,
65                            size_t size,
66                            off_t write_offset,
67                            off_t stream_size);
68
69#if 0
70static size_t new_direct_write(int fd,
71                               void * buf,
72                               off_t buf_offset,
73                               size_t size,
74                               off_t write_offset,
75                               off_t stream_size);
76#endif
77
78static size_t direct_write(int fd,
79                           void * buf,
80                           off_t buf_offset,
81                           size_t size,
82                           off_t write_offset,
83                           off_t stream_size);
84
85static size_t direct_aligned_read(int fd,
86                                  void *buf,
87                                  off_t buf_offset,
88                                  size_t size,
89                                  off_t file_offset,
90                                  off_t stream_size);
91
92static size_t direct_locked_read(int fd,
93                            void * buf,
94                            off_t buf_offset,
95                            size_t size,
96                            off_t file_offset,
97                            off_t stream_size);
98
99static size_t direct_read(int fd,
100                          void * buf,
101                          off_t buf_offset,
102                          size_t size,
103                          off_t file_offset,
104                          off_t stream_size);
105
106#define BLOCK_SIZE 4096
107
108/* compute the mask of 1s that allows us to essentially throw away
109 * all bits less than the block size.
110 */
111#define BLOCK_MULTIPLES_MASK (~((uintptr_t) BLOCK_SIZE - 1))
112#define BLOCK_MULTIPLES_MASK_OFFSET (~((off_t) BLOCK_SIZE - 1))
113
114/* calculate the max offset that is a multiple of the block size but still
115 * less than or equal to requested offset passed in
116 */
117#define ALIGNED_OFFSET(__offset) (__offset & BLOCK_MULTIPLES_MASK_OFFSET)
118
119/* calculate the minimum size that is a multiple of the block size and
120 * still greater than or equal to the requested size
121 */
122#define ALIGNED_SIZE(__offset, __size) \
123    (((__offset + __size + BLOCK_SIZE - 1) \
124      & BLOCK_MULTIPLES_MASK_OFFSET) - ALIGNED_OFFSET(__offset))
125
126#define IS_ALIGNED_PTR(__ptr) \
127    ((((uintptr_t)__ptr) & BLOCK_MULTIPLES_MASK) == (uintptr_t)__ptr)
128
129extern PINT_manager_t io_thread_mgr;
130extern PINT_worker_id io_worker_id;
131extern PINT_queue_id io_queue_id;
132
133#if 0
134struct aligned_block
135{
136    void *ptr;
137    struct qlist_head link;
138};
139static struct aligned_block *blocks;
140static void *aligned_blocks_buffer;
141static QLIST_HEAD(aligned_blocks_unused);
142static QLIST_HEAD(aligned_blocks_used);
143static gen_mutex_t aligned_blocks_mutex = GEN_MUTEX_INITIALIZER;
144static int used_count;
145
146int dbpf_aligned_blocks_init(void);
147void * dbpf_aligned_block_get(void);
148int dbpf_aligned_block_put(void *ptr);
149int dbpf_aligned_blocks_finalize(void);
150#endif
151
152/**
153 * Perform an write in direct mode (no buffering).
154 *
155 * @param fd - The file descriptor of the bstream to do the write on.  THe
156 * file descriptor is required to opened with O_DIRECT.  In debug mode,
157 * the O_DIRECT option is checked.
158 *
159 * @param buf - the buffer containing the bytes to write to the bstream.  The
160 * buffer is required to be allocated with the correct alignment (to a block
161 * size of 512)
162 *
163 * @param buf_offset - the offset into the buffer that the write should start
164 *
165 * @param size - the size of bytes to write from the buffer to
166 * the file.
167 *
168 * @param write_offset - the offset into the bstream to start the write
169 *
170 * @param stream_size - the actual size of the bstream (might be stored elsewhere)
171 *
172 * @returns bytes written, otherwise a negative errno error code
173 */
174static size_t direct_aligned_write(int fd,
175                                    void *buf,
176                                    off_t buf_offset,
177                                    size_t size,
178                                    off_t write_offset,
179                                    off_t stream_size)
180{
181    int ret;
182
183#ifndef NDEBUG
184    /* if debug is enabled, check that fd was opened with O_DIRECT */
185
186    if(!(fcntl(fd, F_GETFL) & O_DIRECT))
187    {
188        return -EINVAL;
189    }
190#endif
191
192    /* verify that the buffer is aligned properly */
193    assert(IS_ALIGNED_PTR(buf));
194
195    /* verify that the offset is aligned as well */
196    assert(ALIGNED_OFFSET(buf_offset) == buf_offset);
197
198    /* and the size */
199    assert(ALIGNED_SIZE(write_offset, size) == size);
200
201    /* and the offset into the file */
202    assert(ALIGNED_OFFSET(write_offset) == write_offset);
203
204    ret = dbpf_pwrite(fd, (((char *)buf) + buf_offset), size, write_offset);
205    if(ret < 0)
206    {
207        gossip_err(
208            "dbpf_direct_write: failed to perform aligned write\n");
209        return ret;
210    }
211
212    return ret;
213}
214
215/* static int writes_outstanding = 0;
216gen_mutex_t writes_lock = GEN_MUTEX_INITIALIZER; */
217
218static size_t direct_locked_write(int fd,
219                            void * buf,
220                            off_t buf_offset,
221                            size_t size,
222                            off_t write_offset,
223                            off_t stream_size)
224{
225    struct flock writelock;
226    int ret, write_ret;
227/*      struct timeval start, end; */
228
229    writelock.l_type = F_WRLCK;
230    writelock.l_whence = SEEK_SET;
231    writelock.l_start = (off_t)ALIGNED_OFFSET(write_offset);
232    writelock.l_len = (off_t)ALIGNED_SIZE(write_offset, size);
233    ret = fcntl(fd, F_SETLKW, &writelock);
234    if(ret < 0 && errno == EINTR)
235    {
236        gossip_err("%s: failed to lock flock before writing\n", __func__);
237        return -trove_errno_to_trove_error(errno);
238    }
239    writelock.l_type = F_UNLCK;
240
241    write_ret = direct_write(
242        fd, buf, buf_offset, size, write_offset, stream_size);
243
244    ret = fcntl(fd, F_SETLK, &writelock);
245    if (ret < 0)
246    {
247        gossip_err("%s: failed to unlock flock after writing\n", __func__);
248        return -trove_errno_to_trove_error (errno);
249    }
250
251#if 0
252    if(write_ret > 0)
253    {
254        if((write_offset + size) > stream_size)
255        {
256            ret = DBPF_RESIZE(fd, (write_offset + size));
257            if(ret < 0)
258            {
259                gossip_err("failed ftruncate of O_DIRECT fd to size: %d\n",
260                           (write_offset + size));
261                return -trove_errno_to_trove_error(errno);
262            }
263        }
264    }
265#endif
266
267    return write_ret;
268}
269
270#if 0
271static size_t new_direct_write(int fd,
272                               void * buf,
273                               off_t buf_offset,
274                               size_t size,
275                               off_t write_offset,
276                               off_t stream_size)
277{
278    size_t ret;
279    void *aligned_buf;
280    size_t aligned_size;
281    off_t aligned_offset, end_offset, aligned_end_offset;
282
283    aligned_size = ALIGNED_SIZE(write_offset, size);
284    aligned_offset = ALIGNED_OFFSET(write_offset);
285
286    /* if the buffer passed in, the offsets, and the size are all
287     * aligned properly, just pass through directly
288     */
289    if(IS_ALIGNED_PTR(buf) &&
290       ALIGNED_OFFSET(buf_offset) == buf_offset &&
291       aligned_size == size)
292    {
293        return direct_aligned_write(fd, buf, buf_offset,
294                                   size, write_offset, stream_size);
295    }
296
297    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
298                 "requested write is not aligned, doing memcpy:\n\t"
299                 "buf: %p, "
300                 "buf_offset: %llu, "
301                 "size: %zu, \n\t"
302                 "write_offset: %llu, "
303                 "stream_size: %zu\n",
304                 buf,
305                 llu(buf_offset),
306                 size,
307                 llu(write_offset),
308                 stream_size);
309
310    aligned_buf = dbpf_aligned_block_get();
311    if(!aligned_buf)
312    {
313        return -ENOMEM;
314    }
315
316    /* Do read-modify-write on the ends of the buffer if
317     * the offsets and sizes aren't aligned properly
318     */
319    if(aligned_offset < write_offset)
320    {
321        ret = 0;
322        if(ALIGNED_SIZE(0, stream_size) > aligned_offset)
323        {
324            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at front\n");
325            /* read the first block */
326            ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
327            if(ret < 0)
328            {
329                int pread_errno = errno;
330                gossip_err(
331                    "direct_memcpy_write: RMW failed at "
332                    "beginning of request\n");
333                dbpf_aligned_block_put(aligned_buf);
334
335                return -trove_errno_to_trove_error(pread_errno);
336            }
337        }
338        else
339        {
340            memset(aligned_buf, 0, BLOCK_SIZE);
341        }
342
343        memcpy(((char *)buf) - (write_offset - aligned_offset),
344               aligned_buf, (write_offset - aligned_offset));
345    }
346
347    end_offset = write_offset + size;
348    aligned_end_offset = aligned_offset + aligned_size;
349
350    if(aligned_end_offset > end_offset)
351    {
352        ret = 0;
353        if(ALIGNED_SIZE(0, stream_size) >= aligned_end_offset)
354        {
355            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at end\n");
356            ret = dbpf_pread(fd,
357                             aligned_buf,
358                             BLOCK_SIZE,
359                             aligned_end_offset - BLOCK_SIZE);
360            if(ret < 0)
361            {
362                int pread_errno = errno;
363                gossip_err(
364                    "direct_memcpy_write: RMW failed at end of request\n");
365                dbpf_aligned_block_put(aligned_buf);
366
367                return -trove_errno_to_trove_error(pread_errno);
368            }
369        }
370        else
371        {
372            memset(aligned_buf, 0, BLOCK_SIZE);
373        }
374
375        memcpy(((char *)buf) + size,
376               ((char *)aligned_buf) + (end_offset % BLOCK_SIZE),
377               (aligned_end_offset - end_offset));
378    }
379
380    ret = direct_aligned_write(
381        fd,
382        ((char *)buf) - (write_offset - aligned_offset), 0,
383        aligned_size, aligned_offset, stream_size);
384
385    dbpf_aligned_block_put(aligned_buf);
386
387    return size;
388}
389#endif
390
391static size_t direct_write(int fd,
392                           void * buf,
393                           off_t buf_offset,
394                           size_t size,
395                           off_t write_offset,
396                           off_t stream_size)
397{
398    size_t ret;
399    void * aligned_buf;
400    size_t aligned_size;
401    off_t aligned_offset, end_offset, aligned_end_offset;
402
403    aligned_size = ALIGNED_SIZE(write_offset, size);
404    aligned_offset = ALIGNED_OFFSET(write_offset);
405
406    /* if the buffer passed in, the offsets, and the size are all
407     * aligned properly, just pass through directly
408     */
409    if(IS_ALIGNED_PTR(buf) &&
410       ALIGNED_OFFSET(buf_offset) == buf_offset &&
411       aligned_size == size)
412    {
413        return direct_aligned_write(fd, buf, buf_offset,
414                                   size, write_offset, stream_size);
415    }
416
417    gossip_debug(GOSSIP_DIRECTIO_DEBUG,
418                 "requested write is not aligned, doing memcpy:\n\t"
419                 "buf: %p, "
420                 "buf_offset: %llu, "
421                 "size: %zu, \n\t"
422                 "write_offset: %llu, "
423                 "stream_size: %llu\n",
424                 buf,
425                 llu(buf_offset),
426                 size,
427                 llu(write_offset),
428                 llu(stream_size));
429
430    aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
431    if(!aligned_buf)
432    {
433        return -ENOMEM;
434    }
435
436    /* Do read-modify-write on the ends of the buffer if
437     * the offsets and sizes aren't aligned properly
438     */
439    if(aligned_offset < write_offset)
440    {
441        ret = 0;
442        if(ALIGNED_SIZE(0, stream_size) > aligned_offset)
443        {
444            /* read the first block */
445            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at front\n");
446            ret = dbpf_pread(fd, aligned_buf, BLOCK_SIZE, aligned_offset);
447            if(ret < 0)
448            {
449                int pread_errno = errno;
450                gossip_err(
451                    "direct_memcpy_write: RMW failed at "
452                    "beginning of request\n");
453                PINT_mem_aligned_free(aligned_buf);
454
455                return -trove_errno_to_trove_error(pread_errno);
456            }
457        }
458        else
459        {
460            memset(aligned_buf, 0, BLOCK_SIZE);
461        }
462    }
463
464    end_offset = write_offset + size;
465    aligned_end_offset = aligned_offset + aligned_size;
466
467    if(aligned_end_offset > end_offset)
468    {
469        ret = 0;
470        if(ALIGNED_SIZE(0, stream_size) >= aligned_end_offset)
471        {
472            gossip_debug(GOSSIP_DIRECTIO_DEBUG, "Doing RMW at end\n");
473            ret = dbpf_pread(
474                fd,
475                ((char *)aligned_buf) + aligned_size - BLOCK_SIZE,
476                BLOCK_SIZE,
477                aligned_end_offset - BLOCK_SIZE);
478            if(ret < 0)
479            {
480                int pread_errno = errno;
481                gossip_err(
482                    "direct_memcpy_write: RMW failed at end of request\n");
483                PINT_mem_aligned_free(aligned_buf);
484
485                return -trove_errno_to_trove_error(pread_errno);
486            }
487        }
488        else
489        {
490            memset(((char *)aligned_buf) + aligned_size - BLOCK_SIZE,
491                   0, BLOCK_SIZE);
492        }
493    }
494
495    /* now we're read to memcpy the actual (unaligned) request into the
496     * aligned buffer
497     */
498    memcpy(((char *)aligned_buf) + (write_offset - aligned_offset),
499           ((char *)buf) + buf_offset, size);
500
501    ret = direct_aligned_write(fd, aligned_buf, 0,
502                                aligned_size, aligned_offset, stream_size);
503
504    PINT_mem_aligned_free(aligned_buf);
505
506    return (ret < 0) ? ret : size;
507}
508
509/**
510 * Perform a read in direct mode (no buffering).
511 *
512 * @param fd - The file descriptor of the bstream to do the read from.  The
513 * file descriptor is required to be opened with O_DIRECT.  In debug mode,
514 * the O_DIRECT option is checked, and if it doesn't exist on the open file
515 * descriptor, EINVAL is returned.
516 *
517 * @param buf - The buffer to read data into.  This function assumes that
518 * the buffer has been allocated with the correct alignment (i.e. to a block
519 * size of 512, using posix_memalign or such).
520 *
521 * @param buf_offset - The offset into the buffer that data is read.
522 *
523 * @param buf_size - The available size of the buffer
524 *
525 * @param file_offset - offset into the file to start the read
526 *
527 * @param request_size - number of bytes to read from the file
528 *
529 * @param stream_size - size of the file
530 *
531 * @return number of bytes read
532 */
533static size_t direct_aligned_read(int fd,
534                                   void * buf,
535                                   off_t buf_offset,
536                                   size_t size,
537                                   off_t file_offset,
538                                   off_t stream_size)
539{
540    int ret;
541
542    if(file_offset >= stream_size)
543    {
544        /* the offset is past EOF, return 0 bytes read */
545        return 0;
546    }
547
548#ifndef NDEBUG
549    /* if debug is enabled, check that fd was opened with O_DIRECT */
550
551    if(!(fcntl(fd, F_GETFL) & O_DIRECT))
552    {
553        gossip_err("dbpf_direct_read: trying to do direct IO but file wasn't "
554                   "opened with O_DIRECT\n");
555        return -EINVAL;
556    }
557#endif
558
559    /* verify that stuff is aligned properly */
560    assert(IS_ALIGNED_PTR(buf));
561    assert(ALIGNED_OFFSET(buf_offset) == buf_offset);
562    assert(ALIGNED_SIZE(file_offset, size) == size);
563    assert(ALIGNED_OFFSET(file_offset) == file_offset);
564
565    ret = dbpf_pread(fd, (((char *)buf) + buf_offset), size, file_offset);
566    if(ret < 0)
567    {
568        gossip_err("dbpf_direct_read: failed to perform aligned read\n");
569        return -trove_errno_to_trove_error(errno);
570    }
571
572    return ret;
573}
574
575static size_t direct_locked_read(int fd,
576                           void * buf,
577                           off_t buf_offset,
578                           size_t size,
579                           off_t file_offset,
580                           off_t stream_size)
581{
582    int ret, read_ret;
583    struct flock readlock;
584
585    readlock.l_type = F_RDLCK;
586    readlock.l_whence = SEEK_SET;
587    readlock.l_start = (off_t)ALIGNED_OFFSET(file_offset);
588    readlock.l_len = (off_t)ALIGNED_SIZE(file_offset, size);
589    ret = fcntl(fd, F_SETLKW, &readlock);
590    if(ret < 0 && errno == EINTR)
591    {
592        return -trove_errno_to_trove_error(errno);
593    }
594    readlock.l_type = F_UNLCK;
595
596    read_ret = direct_read(fd, buf, buf_offset, size, file_offset, stream_size);
597
598    ret = fcntl(fd, F_SETLK, &readlock);
599    if(ret < 0)
600    {
601        return -trove_errno_to_trove_error(errno);
602    }
603
604    return read_ret;
605}
606
607static size_t direct_read(int fd,
608                           void * buf,
609                           off_t buf_offset,
610                           size_t size,
611                           off_t file_offset,
612                           off_t stream_size)
613{
614    void * aligned_buf;
615    off_t aligned_offset;
616    size_t aligned_size, read_size;
617    size_t ret;
618
619    if(file_offset > stream_size)
620    {
621        return 0;
622    }
623
624    read_size = size;
625    if(stream_size < (file_offset + size))
626    {
627        read_size = stream_size - file_offset;
628    }
629
630    aligned_offset = ALIGNED_OFFSET(file_offset);
631    aligned_size = ALIGNED_SIZE(file_offset, read_size);
632
633    if(IS_ALIGNED_PTR(buf) &&
634       ALIGNED_OFFSET(buf_offset) == buf_offset &&
635       aligned_size == read_size)
636    {
637        return direct_aligned_read(fd, buf, buf_offset, read_size,
638                                   file_offset, stream_size);
639    }
640
641    aligned_buf = PINT_mem_aligned_alloc(aligned_size, BLOCK_SIZE);
642    if(!aligned_buf)
643    {
644        return -ENOMEM;
645    }
646
647    ret = direct_aligned_read(fd, aligned_buf, 0, aligned_size,
648                               aligned_offset, stream_size);
649    if(ret < 0)
650    {
651        PINT_mem_aligned_free(aligned_buf);
652
653        return ret;
654    }
655
656    memcpy(((char *)buf) + buf_offset,
657           ((char *)aligned_buf) + (file_offset - aligned_offset),
658           read_size);
659
660    PINT_mem_aligned_free(aligned_buf);
661
662    return ret;
663}
664
665static int dbpf_bstream_direct_read_op_svc(void *ptr, PVFS_hint hint)
666{
667    int ret = -TROVE_EINVAL;
668    TROVE_object_ref ref;
669    TROVE_ds_attributes attr;
670    dbpf_queued_op_t *qop_p;
671    struct dbpf_bstream_rw_list_op *rw_op;
672    dbpf_stream_extents_t *stream_extents = NULL;
673    int i, extent_count;
674
675    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
676    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
677
678    ref.fs_id = qop_p->op.coll_p->coll_id;
679    ref.handle = qop_p->op.handle;
680
681    /* not in attribute cache.  get the size from dspace */
682    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
683    if(ret != 0)
684    {
685        gossip_err("%s: failed to get size in dspace attr: (error=%d)\n", __func__, ret);
686        goto done;
687    }
688
689    ret = dbpf_bstream_get_extents(
690        rw_op->mem_offset_array,
691        rw_op->mem_size_array,
692        rw_op->mem_array_count,
693        rw_op->stream_offset_array,
694        rw_op->stream_size_array,
695        rw_op->stream_array_count,
696        &extent_count,
697        NULL);
698    if(ret != 0)
699    {
700        gossip_err("%s: failed to get bstream extents from offset/sizes: (error=%d)\n", __func__, ret);
701        goto done;
702    }
703
704    stream_extents = malloc(sizeof(*stream_extents) * extent_count);
705    if(!stream_extents)
706    {
707        return -TROVE_ENOMEM;
708    }
709
710    ret = dbpf_bstream_get_extents(
711        rw_op->mem_offset_array,
712        rw_op->mem_size_array,
713        rw_op->mem_array_count,
714        rw_op->stream_offset_array,
715        rw_op->stream_size_array,
716        rw_op->stream_array_count,
717        &extent_count,
718        stream_extents);
719    if(ret != 0)
720    {
721        gossip_err("%s: failed to get bstream extents from offset/sizes: (error=%d)\n", __func__, ret);
722        goto done;
723    }
724
725    for(i = 0; i < extent_count; ++ i)
726    {
727        ret = direct_locked_read(rw_op->open_ref.fd,
728                          stream_extents[i].buffer,
729                          0,
730                          stream_extents[i].size,
731                          stream_extents[i].offset,
732                          attr.u.datafile.b_size);
733        if(ret < 0)
734        {
735            ret = -trove_errno_to_trove_error(-ret);
736            gossip_err("%s: direct_locked_read failed: (error=%d)\n", __func__, ret);
737            goto done;
738        }
739    }
740
741    ret = DBPF_OP_COMPLETE;
742
743done:
744    if(stream_extents)
745    {
746        free(stream_extents);
747    }
748    dbpf_open_cache_put(&rw_op->open_ref);
749    return ret;
750}
751
752static int dbpf_bstream_direct_write_op_svc(void *ptr, PVFS_hint hint)
753{
754    int ret = -TROVE_EINVAL;
755    TROVE_object_ref ref;
756    TROVE_ds_attributes attr;
757    dbpf_stream_extents_t *stream_extents = NULL;
758    int i, extent_count;
759    struct dbpf_bstream_rw_list_op *rw_op;
760    dbpf_queued_op_t *qop_p;
761    PVFS_size eor = -1;
762    int sync_required = 0;
763
764    rw_op = (struct dbpf_bstream_rw_list_op *)ptr;
765    qop_p = (dbpf_queued_op_t *)rw_op->queued_op_ptr;
766
767    ref.fs_id = qop_p->op.coll_p->coll_id;
768    ref.handle = qop_p->op.handle;
769
770    ret = dbpf_bstream_get_extents(
771        rw_op->mem_offset_array,
772        rw_op->mem_size_array,
773        rw_op->mem_array_count,
774        rw_op->stream_offset_array,
775        rw_op->stream_size_array,
776        rw_op->stream_array_count,
777        &extent_count,
778        NULL);
779    if(ret != 0)
780    {
781        gossip_err("%s: failed to count extents from stream offset/sizes: (error=%d)\n", __func__, ret);
782        goto cache_put;
783    }
784
785    stream_extents = malloc(sizeof(*stream_extents) * extent_count);
786    if(!stream_extents)
787    {
788        ret = -TROVE_ENOMEM;
789        goto cache_put;
790    }
791
792    ret = dbpf_bstream_get_extents(
793        rw_op->mem_offset_array,
794        rw_op->mem_size_array,
795        rw_op->mem_array_count,
796        rw_op->stream_offset_array,
797        rw_op->stream_size_array,
798        rw_op->stream_array_count,
799        &extent_count,
800        stream_extents);
801    if(ret != 0)
802    {
803        gossip_err("%s: failed to get stream extents from stream offset/sizes: (error=%d)\n", __func__, ret);
804        goto cache_put;
805    }
806
807    ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
808    if(ret != 0)
809    {
810        gossip_err("%s: failed to get dspace attr for bstream: (error=%d)\n", __func__, ret);
811        goto cache_put;
812    }
813
814    *rw_op->out_size_p = 0;
815    for(i = 0; i < extent_count; ++ i)
816    {
817        ret = direct_locked_write(rw_op->open_ref.fd,
818                                  stream_extents[i].buffer,
819                                  0,
820                                  stream_extents[i].size,
821                                  stream_extents[i].offset,
822                                  attr.u.datafile.b_size);
823        if(ret < 0)
824        {
825            gossip_err("%s: failed to perform direct locked write: (error=%d)\n", __func__, ret);
826            goto cache_put;
827        }
828
829        if(eor < stream_extents[i].offset + stream_extents[i].size)
830        {
831            eor = stream_extents[i].offset + stream_extents[i].size;
832        }
833
834        *rw_op->out_size_p += ret;
835    }
836
837    if(eor > attr.u.datafile.b_size)
838    {
839        int outcount;
840
841        gen_mutex_lock(&dbpf_update_size_lock);
842        ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr);
843        if(ret != 0)
844        {
845            gossip_err("%s: failed to get size from dspace attr: (error=%d)\n", __func__, ret);
846            gen_mutex_unlock(&dbpf_update_size_lock);
847            goto cache_put;
848        }
849
850        if(eor > attr.u.datafile.b_size)
851        {
852            /* set the size of the file */
853            attr.u.datafile.b_size = eor;
854            ret = dbpf_dspace_attr_set(qop_p->op.coll_p, ref, &attr);
855            if(ret != 0)
856            {
857                gossip_err("%s: failed to update size in dspace attr: (error=%d)\n", __func__, ret);
858                gen_mutex_unlock(&dbpf_update_size_lock);
859                goto cache_put;
860            }
861            sync_required = 1;
862        }
863        gen_mutex_unlock(&dbpf_update_size_lock);
864
865        if(sync_required == 1)
866        {
867            gossip_debug(GOSSIP_DIRECTIO_DEBUG,
868                "directio updating size for handle %llu\n", llu(ref.handle));
869
870            dbpf_open_cache_put(&rw_op->open_ref);
871
872            /* If we updated the size, then convert cur_op into a setattr.
873             * Note that we are not actually going to perform a setattr.
874             * We just want the coalescing path to treat it like a setattr
875             * so that the size update is synced before we complete.
876             */
877            dbpf_queued_op_init(qop_p,
878                                DSPACE_SETATTR,
879                                ref.handle,
880                                qop_p->op.coll_p,
881                                dbpf_dspace_setattr_op_svc,
882                                qop_p->op.user_ptr,
883                                TROVE_SYNC,
884                                qop_p->op.context_id);
885            qop_p->op.state = OP_IN_SERVICE;
886            ret = dbpf_sync_coalesce(qop_p, 0, &outcount);
887            if(ret < 0)
888            {
889                gossip_err("%s: failed to coalesce size update in dspace attr: (error=%d)\n", __func__, ret);
890                goto done;
891            }
892
893            ret = PINT_MGMT_OP_CONTINUE;
894            goto done;
895        }
896    }
897
898    ret = PINT_MGMT_OP_COMPLETED;
899
900cache_put:
901    dbpf_open_cache_put(&rw_op->open_ref);
902done:
903    if(stream_extents)
904    {
905        free(stream_extents);
906    }
907    return ret;
908}
909
910static int dbpf_bstream_direct_read_at(TROVE_coll_id coll_id,
911                                       TROVE_handle handle,
912                                       void *buffer,
913                                       TROVE_size *inout_size_p,
914                                       TROVE_offset offset,
915                                       TROVE_ds_flags flags,
916                                       TROVE_vtag_s *vtag,
917                                       void *user_ptr,
918                                       TROVE_context_id context_id,
919                                       TROVE_op_id *out_op_id_p,
920                                       PVFS_hint hints)
921{
922    return -TROVE_ENOSYS;
923}
924
925static int dbpf_bstream_direct_write_at(TROVE_coll_id coll_id,
926                                        TROVE_handle handle,
927                                        void *buffer,
928                                        TROVE_size *inout_size_p,
929                                        TROVE_offset offset,
930                                        TROVE_ds_flags flags,
931                                        TROVE_vtag_s *vtag,
932                                        void *user_ptr,
933                                        TROVE_context_id context_id,
934                                        TROVE_op_id *out_op_id_p,
935                                        PVFS_hint hints)
936{
937    return -TROVE_ENOSYS;
938}
939
940static int dbpf_bstream_direct_read_list(TROVE_coll_id coll_id,
941                                         TROVE_handle handle,
942                                         char **mem_offset_array,
943                                         TROVE_size *mem_size_array,
944                                         int mem_count,
945                                         TROVE_offset *stream_offset_array,
946                                         TROVE_size *stream_size_array,
947                                         int stream_count,
948                                         TROVE_size *out_size_p,
949                                         TROVE_ds_flags flags,
950                                         TROVE_vtag_s *vtag,
951                                         void *user_ptr,
952                                         TROVE_context_id context_id,
953                                         TROVE_op_id *out_op_id_p,
954                                         PVFS_hint hints)
955{
956
957    dbpf_queued_op_t *q_op_p = NULL;
958    struct dbpf_bstream_rw_list_op *op;
959    struct dbpf_collection *coll_p = NULL;
960    int ret;
961
962    coll_p = dbpf_collection_find_registered(coll_id);
963    if (coll_p == NULL)
964    {
965        gossip_err("%s: failed to find collection with fsid %d\n", __func__, coll_id);
966        return -TROVE_EINVAL;
967    }
968
969    q_op_p = dbpf_queued_op_alloc();
970    if (q_op_p == NULL)
971    {
972        return -TROVE_ENOMEM;
973    }
974
975    /* initialize all the common members */
976    dbpf_queued_op_init(q_op_p,
977                        BSTREAM_READ_LIST,
978                        handle,
979                        coll_p,
980                        NULL,
981                        user_ptr,
982                        flags,
983                        context_id);
984    op = (struct dbpf_bstream_rw_list_op *)&q_op_p->op.u.b_rw_list;
985
986    /* initialize the op-specific members */
987    op->stream_array_count = stream_count;
988    op->stream_offset_array = stream_offset_array;
989    op->stream_size_array = stream_size_array;
990    op->out_size_p = out_size_p;
991
992    op->mem_array_count = mem_count;
993    op->mem_offset_array = mem_offset_array;
994    op->mem_size_array = mem_size_array;
995    op->queued_op_ptr = q_op_p;
996
997    ret = dbpf_open_cache_get(
998        coll_id, handle,
999        DBPF_FD_DIRECT_READ,
1000        &op->open_ref);
1001    if(ret < 0)
1002    {
1003        if(ret == -TROVE_ENOENT)
1004        {
1005            /* We create the bstream lazily, so here we'll just assume the read
1006             * was done before writes to this bstream occured, and return
1007             * a successful read of size 0.
1008             */
1009            *out_size_p = 0;
1010            ret = DBPF_OP_COMPLETE;
1011        }
1012        dbpf_queued_op_free(q_op_p);
1013        return ret;
1014    }
1015
1016    *out_op_id_p = q_op_p->op.id;
1017    ret = PINT_manager_id_post(
1018        io_thread_mgr, q_op_p, &q_op_p->mgr_op_id,
1019        dbpf_bstream_direct_read_op_svc, op, NULL, io_queue_id);
1020    if(ret < 0)
1021    {
1022        gossip_err("%s: failed to post direct read op: (error=%d)\n", __func__, ret);
1023        return ret;
1024    }
1025
1026    return DBPF_OP_CONTINUE;
1027}
1028
1029static int dbpf_bstream_direct_write_list(TROVE_coll_id coll_id,
1030                                          TROVE_handle handle,
1031                                          char **mem_offset_array,
1032                                          TROVE_size *mem_size_array,
1033                                          int mem_count,
1034                                          TROVE_offset *stream_offset_array,
1035                                          TROVE_size *stream_size_array,
1036                                          int stream_count,
1037                                          TROVE_size *out_size_p,
1038                                          TROVE_ds_flags flags,
1039                                          TROVE_vtag_s *vtag,
1040                                          void *user_ptr,
1041                                          TROVE_context_id context_id,
1042                                          TROVE_op_id *out_op_id_p,
1043                                          PVFS_hint hints)
1044{
1045
1046    dbpf_queued_op_t *q_op_p = NULL;
1047    struct dbpf_bstream_rw_list_op *op;
1048    struct dbpf_collection *coll_p = NULL;
1049    int ret;
1050
1051    coll_p = dbpf_collection_find_registered(coll_id);
1052    if (coll_p == NULL)
1053    {
1054        return -TROVE_EINVAL;
1055    }
1056
1057    q_op_p = dbpf_queued_op_alloc();
1058    if(!q_op_p)
1059    {
1060        return -TROVE_ENOMEM;
1061    }
1062    dbpf_queued_op_init(q_op_p,
1063                        BSTREAM_WRITE_LIST,
1064                        handle,
1065                        coll_p,
1066                        NULL,
1067                        user_ptr,
1068                        TROVE_SYNC,
1069                        context_id);
1070
1071    op = &q_op_p->op.u.b_rw_list;
1072
1073    /* initialize the op-specific members */
1074    op->stream_array_count = stream_count;
1075    op->stream_offset_array = stream_offset_array;
1076    op->stream_size_array = stream_size_array;
1077    op->out_size_p = out_size_p;
1078
1079    op->mem_array_count = mem_count;
1080    op->mem_offset_array = mem_offset_array;
1081    op->mem_size_array = mem_size_array;
1082    op->queued_op_ptr = q_op_p;
1083
1084    ret = dbpf_open_cache_get(
1085        coll_id, handle,
1086        DBPF_FD_DIRECT_WRITE,
1087        &op->open_ref);
1088    if(ret < 0)
1089    {
1090        dbpf_queued_op_free(q_op_p);
1091        return ret;
1092    }
1093
1094    *out_op_id_p = q_op_p->op.id;
1095
1096    gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: queuing direct write operation\n", __func__);
1097    PINT_manager_id_post(
1098        io_thread_mgr, q_op_p, &q_op_p->mgr_op_id,
1099        dbpf_bstream_direct_write_op_svc, op, NULL, io_queue_id);
1100
1101    return DBPF_OP_CONTINUE;
1102}
1103
1104static int dbpf_bstream_direct_resize_op_svc(struct dbpf_op *op_p)
1105{
1106    int ret;
1107    TROVE_ds_attributes attr;
1108    TROVE_object_ref ref;
1109    dbpf_queued_op_t *q_op_p;
1110    struct open_cache_ref open_ref;
1111    PVFS_size tmpsize;
1112
1113    q_op_p = (dbpf_queued_op_t *)op_p->u.b_resize.queued_op_ptr;
1114    ref.fs_id = op_p->coll_p->coll_id;
1115    ref.handle = op_p->handle;
1116
1117    gen_mutex_lock(&dbpf_update_size_lock);
1118    ret = dbpf_dspace_attr_get(op_p->coll_p, ref, &attr);
1119    if(ret != 0)
1120    {
1121        gen_mutex_unlock(&dbpf_update_size_lock);
1122        return ret;
1123    }
1124
1125    tmpsize = op_p->u.b_resize.size;
1126    attr.u.datafile.b_size = tmpsize;
1127
1128    ret = dbpf_dspace_attr_set(op_p->coll_p, ref, &attr);
1129    if(ret < 0)
1130    {
1131        gen_mutex_unlock(&dbpf_update_size_lock);
1132        return ret;
1133    }
1134    gen_mutex_unlock(&dbpf_update_size_lock);
1135
1136    /* setup op for sync coalescing */
1137    dbpf_queued_op_init(q_op_p,
1138                        DSPACE_SETATTR,
1139                        ref.handle,
1140                        q_op_p->op.coll_p,
1141                        dbpf_dspace_setattr_op_svc,
1142                        q_op_p->op.user_ptr,
1143                        TROVE_SYNC,
1144                        q_op_p->op.context_id);
1145    q_op_p->op.state = OP_IN_SERVICE;
1146
1147    /* truncate file after attributes are set */
1148    ret = dbpf_open_cache_get(
1149        op_p->coll_p->coll_id, op_p->handle,
1150        DBPF_FD_DIRECT_WRITE,
1151        &open_ref);
1152    if(ret < 0)
1153    {
1154        return ret;
1155    }
1156
1157    ret = DBPF_RESIZE(open_ref.fd, tmpsize);
1158    if(ret < 0)
1159    {
1160        return(ret);
1161    }
1162
1163    dbpf_open_cache_put(&open_ref);
1164
1165    return DBPF_OP_COMPLETE;
1166}
1167
1168static int dbpf_bstream_direct_resize(TROVE_coll_id coll_id,
1169                                      TROVE_handle handle,
1170                                      TROVE_size *inout_size_p,
1171                                      TROVE_ds_flags flags,
1172                                      TROVE_vtag_s *vtag,
1173                                      void *user_ptr,
1174                                      TROVE_context_id context_id,
1175                                      TROVE_op_id *out_op_id_p,
1176                                      PVFS_hint hints)
1177{
1178    dbpf_queued_op_t *q_op_p = NULL;
1179    struct dbpf_collection *coll_p = NULL;
1180
1181    coll_p = dbpf_collection_find_registered(coll_id);
1182    if (coll_p == NULL)
1183    {
1184        return -TROVE_EINVAL;
1185    }
1186
1187    q_op_p = dbpf_queued_op_alloc();
1188    if (q_op_p == NULL)
1189    {
1190        return -TROVE_ENOMEM;
1191    }
1192
1193    /* initialize all the common members */
1194    dbpf_queued_op_init(q_op_p,
1195                        BSTREAM_RESIZE,
1196                        handle,
1197                        coll_p,
1198                        dbpf_bstream_direct_resize_op_svc,
1199                        user_ptr,
1200                        flags,
1201                        context_id);
1202
1203    /* initialize the op-specific members */
1204    q_op_p->op.u.b_resize.size = *inout_size_p;
1205    q_op_p->op.u.b_resize.queued_op_ptr = q_op_p;
1206    *out_op_id_p = dbpf_queued_op_queue(q_op_p);
1207
1208    return 0;
1209}
1210
1211static int dbpf_bstream_direct_validate(TROVE_coll_id coll_id,
1212                                        TROVE_handle handle,
1213                                        TROVE_ds_flags flags,
1214                                        TROVE_vtag_s *vtag,
1215                                        void *user_ptr,
1216                                        TROVE_context_id context_id,
1217                                        TROVE_op_id *out_op_id_p,
1218                                        PVFS_hint hints)
1219{
1220    return -TROVE_ENOSYS;
1221}
1222
1223static int dbpf_bstream_direct_flush(TROVE_coll_id coll_id,
1224                                     TROVE_handle handle,
1225                                     TROVE_ds_flags flags,
1226                                     void *user_ptr,
1227                                     TROVE_context_id context_id,
1228                                     TROVE_op_id *out_op_id_p,
1229                                     PVFS_hint hints)
1230{
1231    return DBPF_OP_COMPLETE;
1232}
1233
1234static int dbpf_bstream_direct_cancel(
1235    TROVE_coll_id coll_id,
1236    TROVE_op_id cancel_id,
1237    TROVE_context_id context_id)
1238{
1239    dbpf_queued_op_t *op;
1240    int ret;
1241
1242    op = id_gen_fast_lookup(cancel_id);
1243    if(!op)
1244    {
1245        gossip_lerr("Invalid op-id to cancel\n");
1246        return -TROVE_EINVAL;
1247    }
1248
1249    ret = PINT_manager_cancel(io_thread_mgr, op->mgr_op_id);
1250    if(ret < 0)
1251    {
1252        return ret|PVFS_ERROR_TROVE;
1253    }
1254
1255    return ret;
1256}
1257
1258struct TROVE_bstream_ops dbpf_bstream_direct_ops =
1259{
1260    dbpf_bstream_direct_read_at,
1261    dbpf_bstream_direct_write_at,
1262    dbpf_bstream_direct_resize,
1263    dbpf_bstream_direct_validate,
1264    dbpf_bstream_direct_read_list,
1265    dbpf_bstream_direct_write_list,
1266    dbpf_bstream_direct_flush,
1267    dbpf_bstream_direct_cancel
1268};
1269
1270static int dbpf_bstream_get_extents(
1271    char **mem_offset_array,
1272    TROVE_size *mem_size_array,
1273    int mem_count,
1274    TROVE_offset *stream_offset_array,
1275    TROVE_size *stream_size_array,
1276    int stream_count,
1277    int *ext_count,
1278    dbpf_stream_extents_t *extents)
1279{
1280    int mct = 0, sct = 0, act = 0;
1281    int oom = 0, oos = 0;
1282    TROVE_size cur_mem_size = 0;
1283    char *cur_mem_off = NULL;
1284    char *ext_ptr = NULL;
1285    TROVE_size ext_size = 0, cur_stream_size = 0;
1286    TROVE_offset ext_off = 0, cur_stream_off = 0;
1287
1288    cur_mem_size = mem_size_array[mct];
1289    cur_mem_off = mem_offset_array[mct];
1290
1291    cur_stream_size = stream_size_array[sct];
1292    cur_stream_off = stream_offset_array[sct];
1293
1294    while (1)
1295    {
1296        /*
1297          determine if we're either out of memory (oom) regions, or
1298          out of stream (oos) regions
1299        */
1300        /* in many (all?) cases mem_count is 1, so oom will end up being 1 */
1301        oom = (((mct + 1) < mem_count) ? 0 : 1);
1302        oos = (((sct + 1) < stream_count) ? 0 : 1);
1303
1304        if (cur_mem_size == cur_stream_size)
1305        {
1306            /* consume both mem and stream regions */
1307            ext_size = cur_mem_size;
1308            ext_ptr = cur_mem_off;
1309            ext_off = cur_stream_off;
1310
1311            if (!oom)
1312            {
1313                cur_mem_size = mem_size_array[++mct];
1314                cur_mem_off  = mem_offset_array[mct];
1315            }
1316            else
1317            {
1318                cur_mem_size = 0;
1319            }
1320            if (!oos)
1321            {
1322                cur_stream_size = stream_size_array[++sct];
1323                cur_stream_off  = stream_offset_array[sct];
1324            }
1325            else
1326            {
1327                cur_stream_size = 0;
1328            }
1329        }
1330        else if (cur_mem_size < cur_stream_size)
1331        {
1332            /* consume mem region and update stream region */
1333            ext_size = cur_mem_size;
1334            ext_ptr = cur_mem_off;
1335            ext_off = cur_stream_off;
1336
1337            cur_stream_size -= cur_mem_size;
1338            cur_stream_off  += cur_mem_size;
1339
1340            if (!oom)
1341            {
1342                cur_mem_size = mem_size_array[++mct];
1343                cur_mem_off  = mem_offset_array[mct];
1344            }
1345            else
1346            {
1347                cur_mem_size = 0;
1348            }
1349        }
1350        else /* cur_mem_size > cur_stream_size */
1351        {
1352            /* consume stream region and update mem region */
1353            ext_size = cur_stream_size;
1354            ext_ptr = cur_mem_off;
1355            ext_off = cur_stream_off;
1356
1357            cur_mem_size -= cur_stream_size;
1358            cur_mem_off  += cur_stream_size;
1359
1360            if (!oos)
1361            {
1362                cur_stream_size = stream_size_array[++sct];
1363                cur_stream_off  = stream_offset_array[sct];
1364            }
1365            else
1366            {
1367                cur_stream_size = 0;
1368            }
1369        }
1370
1371        if(extents)
1372        {
1373            extents[act].buffer = ext_ptr;
1374            extents[act].offset = ext_off;
1375            extents[act].size =   ext_size;
1376        }
1377        act++;
1378
1379        /* process until there are no bytes left in the current piece */
1380        if ((oom && cur_mem_size == 0) || (oos && cur_stream_size == 0))
1381        {
1382            break;
1383        }
1384    }
1385
1386    /* return the number actually used */
1387    *ext_count = act;
1388    return 0;
1389}
1390
1391#if 0
1392int dbpf_aligned_blocks_init(void)
1393{
1394    int i;
1395
1396    aligned_blocks_buffer = PINT_mem_aligned_alloc(BLOCK_SIZE*256, BLOCK_SIZE);
1397    blocks = malloc(sizeof(*blocks) * 256);
1398    used_count = 0;
1399    gen_mutex_lock(&aligned_blocks_mutex);
1400    for(i = 0; i < 256; ++i)
1401    {
1402        blocks[i].ptr = ((char *)aligned_blocks_buffer) + (i*BLOCK_SIZE);
1403        qlist_add_tail(&(blocks[i].link), &aligned_blocks_unused);
1404    }
1405    gen_mutex_unlock(&aligned_blocks_mutex);
1406    return 0;
1407}
1408
1409int dbpf_aligned_blocks_finalize(void)
1410{
1411    free(blocks);
1412    PINT_mem_aligned_free(aligned_blocks_buffer);
1413    return 0;
1414}
1415
1416void *dbpf_aligned_block_get(void)
1417{
1418    void *ptr;
1419    struct aligned_block *ablock;
1420    gen_mutex_lock(&aligned_blocks_mutex);
1421    if(used_count > 255)
1422    {
1423        gossip_debug(GOSSIP_DIRECTIO_DEBUG, "ran out of aligned blocks: %d\n",
1424                     used_count);
1425        gen_mutex_unlock(&aligned_blocks_mutex);
1426        return NULL;
1427    }
1428    if(qlist_empty(&aligned_blocks_unused))
1429    {
1430        gossip_debug(GOSSIP_DIRECTIO_DEBUG,
1431                     "aligned_block_get: unused list empty.\n");
1432        gen_mutex_unlock(&aligned_blocks_mutex);
1433        return NULL;
1434    }
1435
1436    ablock = qlist_entry(aligned_blocks_unused.next, struct aligned_block, link);
1437    qlist_del(&ablock->link);
1438    ptr = ablock->ptr;
1439    ablock->ptr = NULL;
1440    qlist_add_tail(&ablock->link, &aligned_blocks_used);
1441    ++used_count;
1442    gen_mutex_unlock(&aligned_blocks_mutex);
1443    return ptr;
1444}
1445
1446int dbpf_aligned_block_put(void *ptr)
1447{
1448    struct aligned_block *ablock;
1449
1450    gen_mutex_lock(&aligned_blocks_mutex);
1451    ablock = qlist_entry(aligned_blocks_used.next, struct aligned_block, link);
1452    qlist_del(&ablock->link);
1453    ablock->ptr = ptr;
1454    qlist_add_tail((&(ablock->link)), &aligned_blocks_unused);
1455    --used_count;
1456    gen_mutex_unlock(&aligned_blocks_mutex);
1457    return 0;
1458}
1459#endif
1460
1461/*
1462 * Local variables:
1463 *  c-indent-level: 4
1464 *  c-basic-offset: 4
1465 * End:
1466 *
1467 * vim: ts=8 sts=4 sw=4 expandtab
1468 */
Note: See TracBrowser for help on using the browser.