root/branches/Orange-Branch/src/kernel/linux-2.6/file.c @ 8876

Revision 8876, 114.6 KB (checked in by mtmoore, 2 years ago)

remove non-error gossip_err and fix kernel2.4 build

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7/** \file
8 *  \ingroup pvfs2linux
9 *
10 *  Linux VFS file operations.
11 */
12
13#include "pvfs2-kernel.h"
14#include "pvfs2-bufmap.h"
15#include "pvfs2-types.h"
16#include "pvfs2-internal.h"
17#include <linux/fs.h>
18#include <linux/pagemap.h>
19
20enum io_type {
21    IO_READ = 0,
22    IO_WRITE = 1,
23    IO_READV = 0,
24    IO_WRITEV = 1,
25    IO_READX = 0,
26    IO_WRITEX = 1,
27};
28
29struct rw_options;
30
31#ifdef PVFS2_LINUX_KERNEL_2_4
32static int pvfs2_precheck_file_write(struct file *file, struct inode *inode,
33    size_t *count, loff_t *ppos);
34#endif
35
36static ssize_t wait_for_cached_io(struct rw_options *old_rw,
37                                  struct iovec *vec,
38                                  int nr_segs,
39                                  size_t total_size) __attribute__((unused));
40
41static ssize_t wait_for_direct_io(struct rw_options *rw,
42                                  struct iovec *vec,
43                                  unsigned long  nr_segs,
44                                  size_t total_size);
45
46static ssize_t wait_for_iox(struct rw_options *rw,
47                            struct iovec *vec,
48                            unsigned long  nr_segs,
49                            struct xtvec *xtvec,
50                            unsigned long xtnr_segs,
51                            size_t total_size);
52
53#define wake_up_daemon_for_return(op)             \
54do {                                              \
55  spin_lock(&op->lock);                           \
56  op->io_completed = 1;                           \
57  spin_unlock(&op->lock);                         \
58  wake_up_interruptible(&op->io_completion_waitq);\
59} while(0)
60
61#ifndef HAVE_COMBINED_AIO_AND_VECTOR
62/* <2.6.19 called it this instead */
63#define do_sync_read generic_file_read
64#endif
65
66/** Called when a process requests to open a file.
67 */
68int pvfs2_file_open(
69    struct inode *inode,
70    struct file *file)
71{
72    int ret = -EINVAL;
73
74    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_open: called on %s (inode is %llu)\n",
75                file->f_dentry->d_name.name, llu(get_handle_from_ino(inode)));
76
77    inode->i_mapping->host = inode;
78    inode->i_mapping->a_ops = &pvfs2_address_operations;
79#ifndef PVFS2_LINUX_KERNEL_2_4
80    inode->i_mapping->backing_dev_info = &pvfs2_backing_dev_info;
81#endif
82
83    if (S_ISDIR(inode->i_mode))
84    {
85        ret = dcache_dir_open(inode, file);
86    }
87    else
88    {
89        /*
90          if the file's being opened for append mode, set the file pos
91          to the end of the file when we retrieve the size (which we
92          must forcefully do here in this case, afaict atm)
93        */
94        if (file->f_flags & O_APPEND)
95        {
96            /*
97             * When we do a getattr in response to an open with O_APPEND,
98             * all we are interested in is the file size. Hence we will
99             * set the mask to only the size and nothing else
100             * Hopefully, this will help us in reducing the number of getattr's
101             */
102            ret = pvfs2_inode_getattr(inode, PVFS_ATTR_SYS_SIZE);
103            if (ret == 0)
104            {
105                file->f_pos = pvfs2_i_size_read(inode);
106                gossip_debug(GOSSIP_FILE_DEBUG, "f_pos = %ld\n", (unsigned long)file->f_pos);
107            }
108            else
109            {
110                gossip_debug(GOSSIP_FILE_DEBUG, "%s:%s:%d calling make bad inode\n", __FILE__,  __func__, __LINE__);
111                pvfs2_make_bad_inode(inode);
112                gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_open returning error: %d\n", ret);
113                return(ret);
114            }
115        }
116
117        /*
118          fs/open.c: returns 0 after enforcing large file support if
119          running on a 32 bit system w/o O_LARGFILE flag
120        */
121        ret = generic_file_open(inode, file);
122    }
123
124    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_open returning normally: %d\n", ret);
125    return ret;
126}
127
128enum dest_type {
129    /* Destination type can be addresses (user or kernel va) */
130    COPY_DEST_ADDRESSES = 0,
131    /* or can be pointers to struct pages */
132    COPY_DEST_PAGES     = 1
133};
134
135struct rw_options {
136    /* whether or not it is a synchronous I/O operation */
137    int            async;
138    /* whether it is a READ/WRITE operation */
139    enum io_type   type;
140    /* whether we are copying to addresses/pages */
141    enum dest_type copy_dest_type;
142    struct file   *file;
143    struct inode  *inode;
144    pvfs2_inode_t *pvfs2_inode;
145    loff_t readahead_size;
146    /* whether the destination addresses are in user/kernel */
147    int copy_to_user_addresses;
148    const char *fnstr;
149    ssize_t count;
150    /* Asynch I/O control block */
151    struct kiocb *iocb;
152    union {
153        struct {
154            const struct iovec *iov;
155            unsigned long nr_segs;
156        } address;
157        struct {
158            /* byte-map of which pages are locked down for I/o */
159            unsigned char *pg_byte_map;
160            /* All pages spanning a given I/O operation */
161            struct page  **pages;
162            /* count of such pages */
163            unsigned long nr_pages;
164            /* Only those pages that need to be fetched */
165            struct page  **issue_pages;
166            /* and the count of such pages */
167            unsigned long nr_issue_pages;
168            /* list of pages for which I/O needs to be
169             * done as dictated by read_cache_pages
170             */
171            struct list_head page_list;
172        } pages;
173    } dest;
174    union {
175        /* Contiguous file I/O operations use a single offset */
176        struct {
177            loff_t        *offset;
178        } io;
179        /* Non-contiguous file I/O operations use a vector of offsets */
180        struct {
181            struct xtvec  *xtvec;
182            unsigned long  xtnr_segs;
183        } iox;
184    } off;
185};
186
187/*
188 * Copy to client-core's address space from the buffers specified
189 * by the iovec upto total_size bytes.
190 * NOTE: the iovector can either contain addresses which
191 *       can futher be kernel-space or user-space addresses.
192 *       or it can pointers to struct page's
193 * @buffer_index: index used by client-core's buffers
194 * @rw: operation context (read/write) holding the state of the I/O
195 * @vec: iovector
196 * @nr_segs: number of segments in the iovector
197 * @total_size: Total size in bytes to be copied into client-core.
198 */
199static int precopy_buffers(int buffer_index,
200                           struct rw_options *rw,
201                           const struct iovec *vec,
202                           unsigned long nr_segs,
203                           size_t total_size)
204{
205    int ret = 0;
206
207    if (rw->type == IO_WRITEV)
208    {
209        /*
210         * copy data from application/kernel by pulling it out
211         * of the iovec. NOTE: target buffers can be addresses
212         * or struct page pointers
213         */
214        if (rw->copy_dest_type == COPY_DEST_ADDRESSES) {
215            /* Are we copying from User Virtual Addresses? */
216            if (rw->copy_to_user_addresses)
217            {
218                ret = pvfs_bufmap_copy_iovec_from_user(buffer_index,
219                                                       vec,
220                                                       nr_segs,
221                                                       total_size);
222            }
223            /* Are we copying from Kernel Virtual Addresses? */
224            else {
225                ret = pvfs_bufmap_copy_iovec_from_kernel(buffer_index,
226                                                         vec,
227                                                         nr_segs,
228                                                         total_size);
229            }
230        }
231        else {
232            /* We must be copying from struct page pointers */
233            ret = pvfs_bufmap_copy_from_pages(buffer_index,
234                                              vec,
235                                              nr_segs,
236                                              total_size);
237        }
238        if (ret < 0)
239        {
240            gossip_err("%s: Failed to copy-in buffers. Please make sure "
241                        "that the pvfs2-client is running. %ld\n",
242                        rw->fnstr, (long) ret);
243        }
244    }
245    return ret;
246}
247
248/*
249 * Copy from client-core's address space to the buffers specified
250 * by the iovec upto total_size bytes.
251 * NOTE: the iovector can either contain addresses which
252 *       can futher be kernel-space or user-space addresses.
253 *       or it can pointers to struct page's
254 * @buffer_index: index used by client-core's buffers
255 * @rw: operation context (read/write) holding the state of the I/O
256 * @vec: iovector
257 * @nr_segs: number of segments in the iovector
258 * @total_size: Total size in bytes to be copied from client-core.
259 */
260static int postcopy_buffers(int buffer_index, struct rw_options *rw,
261        const struct iovec *vec, int nr_segs, size_t total_size)
262{
263    int ret = 0;
264
265    if (rw->type == IO_READV)
266    {
267        /*
268         * copy data to application/kernel by pushing it out to the iovec.
269         * NOTE; target buffers can be addresses or struct page pointers
270         */
271        if (total_size)
272        {
273            if (rw->copy_dest_type == COPY_DEST_ADDRESSES) {
274                /* Are we copying to User Virtual Addresses? */
275                if (rw->copy_to_user_addresses)
276                {
277                    ret = pvfs_bufmap_copy_to_user_iovec(buffer_index, vec,
278                            nr_segs, total_size);
279
280                }
281                /* Are we copying to Kernel Virtual Addresses? */
282                else
283                {
284                    ret = pvfs_bufmap_copy_to_kernel_iovec(buffer_index, vec,
285                            nr_segs, total_size);
286                }
287            }
288            else {
289                /* We must be copying to struct page pointers */
290                ret = pvfs_bufmap_copy_to_pages(buffer_index, vec,
291                            nr_segs, total_size);
292            }
293            if (ret < 0)
294            {
295                gossip_err("%s: Failed to copy-out buffers.  Please make sure "
296                            "that the pvfs2-client is running (%ld)\n",
297                            rw->fnstr, (long) ret);
298            }
299        }
300    }
301    return ret;
302}
303
304#ifndef PVFS2_LINUX_KERNEL_2_4
305
306/* Copy from page-cache to application address space
307 * @rw - operation context, contains information about the I/O operation
308 *       and holds the pointers to the page-cache page array from which
309 *       the copies are to be initiated.
310 * @vec - iovec describing the layout of buffers in user-space
311 * @nr_segs - number of segments in the iovec
312 * @total_actual_io - total size of the buffers to be copied.
313 */
314static int copy_from_pagecache(struct rw_options *rw,
315                               const struct iovec *vec,
316                               unsigned long nr_segs,
317                               size_t total_actual_io)
318{
319    struct iovec *copied_iovec = NULL;
320    size_t amt_copied = 0, cur_copy_size = 0;
321    int ret = 0;
322    unsigned long seg, page_offset = 0;
323    int index = 0;
324    void __user *to_addr = NULL;
325
326    gossip_debug(GOSSIP_FILE_DEBUG, "copy_from_pagecache: "
327                 "nr_segs %ld, total_actual_io: %zd, total pages %ld\n",
328                 nr_segs, total_actual_io, rw->dest.pages.nr_pages);
329    /*
330     * copy the passed in iovec so that we can change some of its fields
331     */
332    copied_iovec = kmalloc(nr_segs * sizeof(*copied_iovec),
333                           PVFS2_BUFMAP_GFP_FLAGS);
334    if (copied_iovec == NULL)
335    {
336        gossip_err("copy_from_pagecache: failed allocating memory\n");
337        return -ENOMEM;
338    }
339    memcpy(copied_iovec, vec, nr_segs * sizeof(*copied_iovec));
340    /*
341     * Go through each segment in the iovec and make sure that
342     * the summation of iov_len is greater than the given size.
343     */
344    for (seg = 0, amt_copied = 0; seg < nr_segs; seg++)
345    {
346        amt_copied += copied_iovec[seg].iov_len;
347    }
348    if (amt_copied < total_actual_io)
349    {
350        gossip_err("copy_from_pagecache: user buffer size (%zd) "
351                   "is less than I/O size (%zd)\n",
352                    amt_copied, total_actual_io);
353        kfree(copied_iovec);
354        return -EINVAL;
355    }
356    index = 0;
357    amt_copied = 0;
358    seg = 0;
359    page_offset = 0;
360    /*
361     * Go through each segment in the iovec and copy from the page-cache,
362     * but make sure that we do so one page at a time.
363     */
364    while (amt_copied < total_actual_io)
365    {
366        struct iovec *iv = &copied_iovec[seg];
367        int inc_index = 0;
368        void *from_kaddr;
369
370        if (index >= rw->dest.pages.nr_pages) {
371            gossip_err("index cannot exceed number of allocated pages %ld\n",
372                    (long) rw->dest.pages.nr_pages);
373            kfree(copied_iovec);
374            return -EINVAL;
375        }
376
377        if (iv->iov_len < (PAGE_CACHE_SIZE - page_offset))
378        {
379            cur_copy_size = iv->iov_len;
380            seg++;
381            to_addr = iv->iov_base;
382            inc_index = 0;
383        }
384        else if (iv->iov_len == (PAGE_CACHE_SIZE - page_offset))
385        {
386            cur_copy_size = iv->iov_len;
387            seg++;
388            to_addr = iv->iov_base;
389            inc_index = 1;
390        }
391        else
392        {
393            cur_copy_size = (PAGE_CACHE_SIZE - page_offset);
394            to_addr = iv->iov_base;
395            iv->iov_base += cur_copy_size;
396            iv->iov_len  -= cur_copy_size;
397            inc_index = 1;
398        }
399#if 0
400        gossip_debug(GOSSIP_FILE_DEBUG, "copy_from_pagecache: copying to "
401                "user %p, kernel page %p\n", to_addr, rw->dest.pages.pages[index]);
402#endif
403        from_kaddr = pvfs2_kmap(rw->dest.pages.pages[index]);
404        ret = copy_to_user(to_addr, from_kaddr + page_offset, cur_copy_size);
405        pvfs2_kunmap(rw->dest.pages.pages[index]);
406#if 0
407        gossip_debug(GOSSIP_FILE_DEBUG, "copy_from_pagecache: copying to user %p from "
408                "kernel %p %d bytes (from_kaddr:%p, page_offset:%d)\n",
409                to_addr, from_kaddr + page_offset, cur_copy_size, from_kaddr, page_offset);
410#endif
411        if (ret)
412        {
413            gossip_err("Failed to copy data to user space\n");
414            kfree(copied_iovec);
415            return -EFAULT;
416        }
417
418        amt_copied += cur_copy_size;
419        if (inc_index) {
420            page_offset = 0;
421            index++;
422        }
423        else {
424            page_offset += cur_copy_size;
425        }
426    }
427    kfree(copied_iovec);
428    return 0;
429}
430
431#endif //#ifndef PVFS2_LINUX_KERNEL_2_4
432
433/*
434 * Post and wait for the I/O upcall to finish
435 * @rw - contains state information to initiate the I/O operation
436 * @vec- contains the memory vector regions
437 * @nr_segs - number of memory vector regions
438 * @total_size - total expected size of the I/O operation
439 */
440static ssize_t wait_for_direct_io(struct rw_options *rw,
441                                  struct iovec *vec,
442                                  unsigned long nr_segs,
443                                  size_t total_size)
444{
445    pvfs2_kernel_op_t *new_op = NULL;
446    int buffer_index = -1;
447    ssize_t ret;
448
449    if (!rw || !vec || nr_segs < 0 || total_size <= 0
450            || !rw->pvfs2_inode || !rw->inode || !rw->fnstr)
451    {
452        gossip_lerr("invalid parameters (rw: %p, vec: %p, nr_segs: %lu, "
453                "total_size: %zd)\n", rw, vec, nr_segs, total_size);
454        ret = -EINVAL;
455        goto out;
456    }
457    new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
458    if (!new_op)
459    {
460        ret = -ENOMEM;
461        goto out;
462    }
463    /* synchronous I/O */
464    new_op->upcall.req.io.async_vfs_io = PVFS_VFS_SYNC_IO;
465    new_op->upcall.req.io.readahead_size = (int32_t) rw->readahead_size;
466    new_op->upcall.req.io.io_type = (rw->type == IO_READV) ?
467                                     PVFS_IO_READ : PVFS_IO_WRITE;
468    new_op->upcall.req.io.refn = rw->pvfs2_inode->refn;
469    /* get a shared buffer index */
470    ret = pvfs_bufmap_get(&buffer_index);
471    if (ret < 0)
472    {
473        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get failure (%ld)\n",
474                rw->fnstr, (long) ret);
475        goto out;
476    }
477    gossip_debug(GOSSIP_FILE_DEBUG, "GET op %p -> buffer_index %d\n", new_op, buffer_index);
478
479    new_op->upcall.req.io.buf_index = buffer_index;
480    new_op->upcall.req.io.count = total_size;
481    new_op->upcall.req.io.offset = *(rw->off.io.offset);
482
483    gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %lu, "
484            "offset: %llu total_size: %zd\n", rw->fnstr, rw->copy_to_user_addresses,
485            nr_segs, llu(*(rw->off.io.offset)), total_size);
486    /* Stage 1: copy the buffers into client-core's address space */
487    if ((ret = precopy_buffers(buffer_index, rw, vec, nr_segs, total_size)) < 0)
488    {
489        goto out;
490    }
491    /* Stage 2: Service the I/O operation */
492    ret = service_operation(new_op, rw->fnstr,
493         get_interruptible_flag(rw->inode));
494
495    if (ret < 0)
496    {
497          /* this macro is defined in pvfs2-kernel.h */
498          handle_io_error();
499
500          /*
501            don't write an error to syslog on signaled operation
502            termination unless we've got debugging turned on, as
503            this can happen regularly (i.e. ctrl-c)
504          */
505          if (ret == -EINTR)
506          {
507              gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %ld\n",
508                      rw->fnstr, (long) ret);
509          }
510          else
511          {
512              gossip_err(
513                    "%s: error in %s handle %llu, "
514                    "FILE: %s, returning %ld\n",
515                    rw->fnstr,
516                    rw->type == IO_READV ? "vectored read from" : "vectored write to",
517                    llu(get_handle_from_ino(rw->inode)),
518                    (rw->file && rw->file->f_dentry && rw->file->f_dentry->d_name.name ?
519                     (char *)rw->file->f_dentry->d_name.name : "UNKNOWN"),
520                    (long) ret);
521          }
522          goto out;
523    }
524    /* Stage 3: Post copy buffers from client-core's address space */
525    if ((ret = postcopy_buffers(buffer_index, rw, vec, nr_segs,
526                    new_op->downcall.resp.io.amt_complete)) < 0) {
527        /* put error codes in downcall so that handle_io_error()
528         * preserves it properly
529         */
530        new_op->downcall.status = ret;
531        handle_io_error();
532        goto out;
533    }
534    ret = new_op->downcall.resp.io.amt_complete;
535    gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_io returning %ld\n", (long) ret);
536    /*
537      tell the device file owner waiting on I/O that this read has
538      completed and it can return now.  in this exact case, on
539      wakeup the daemon will free the op, so we *cannot* touch it
540      after this.
541    */
542    wake_up_daemon_for_return(new_op);
543    new_op = NULL;
544out:
545    if (buffer_index >= 0)
546    {
547        pvfs_bufmap_put(buffer_index);
548        gossip_debug(GOSSIP_FILE_DEBUG, "PUT buffer_index %d\n", buffer_index);
549        buffer_index = -1;
550    }
551    if (new_op)
552    {
553        op_release(new_op);
554        new_op = NULL;
555    }
556    return ret;
557}
558
559/*
560 * The reason we need to do this is to be able to support
561 * readv and writev that are
562 * larger than (pvfs_bufmap_size_query())
563 * Default is PVFS2_BUFMAP_DEFAULT_DESC_SIZE MB.
564 * What that means is that
565 * we will create a new io vec descriptor for those memory addresses that
566 * go beyond the limit
567 * Return value for this routine is -ve in case of errors
568 * and 0 in case of success.
569 * Further, the new_nr_segs pointer is updated to hold the new value
570 * of number of iovecs, the new_vec pointer is updated to hold the pointer
571 * to the new split iovec, and the size array is an array of integers holding
572 * the number of iovecs that straddle pvfs_bufmap_size_query().
573 * The max_new_nr_segs value is computed by the caller and returned.
574 * (It will be (count of all iov_len/ block_size) + 1).
575 */
576static int split_iovecs(
577        unsigned long max_new_nr_segs,      /* IN */
578        unsigned long nr_segs,              /* IN */
579        const struct iovec *original_iovec, /* IN */
580        unsigned long *new_nr_segs,         /* OUT */
581        struct iovec **new_vec,             /* OUT */
582        unsigned long *seg_count,           /* OUT */
583        unsigned long **seg_array)          /* OUT */
584{
585    unsigned long seg, count = 0, begin_seg, tmpnew_nr_segs = 0;
586    struct iovec *new_iovec = NULL, *orig_iovec;
587    unsigned long *sizes = NULL, sizes_count = 0;
588
589    if (nr_segs <= 0 || original_iovec == NULL
590            || new_nr_segs == NULL || new_vec == NULL
591            || seg_count == NULL || seg_array == NULL || max_new_nr_segs <= 0)
592    {
593        gossip_err("Invalid parameters to split_iovecs\n");
594        return -EINVAL;
595    }
596    *new_nr_segs = 0;
597    *new_vec = NULL;
598    *seg_count = 0;
599    *seg_array = NULL;
600    /* copy the passed in iovec descriptor to a temp structure */
601    orig_iovec = kmalloc(nr_segs * sizeof(*orig_iovec),
602                         PVFS2_BUFMAP_GFP_FLAGS);
603    if (orig_iovec == NULL)
604    {
605        gossip_err("split_iovecs: Could not allocate memory for %lu bytes!\n",
606                (unsigned long)(nr_segs * sizeof(*orig_iovec)));
607        return -ENOMEM;
608    }
609    new_iovec = kzalloc(max_new_nr_segs * sizeof(*new_iovec),
610                        PVFS2_BUFMAP_GFP_FLAGS);
611    if (new_iovec == NULL)
612    {
613        kfree(orig_iovec);
614        gossip_err("split_iovecs: Could not allocate memory for %lu bytes!\n",
615                (unsigned long)(max_new_nr_segs * sizeof(*new_iovec)));
616        return -ENOMEM;
617    }
618    sizes = kzalloc(max_new_nr_segs * sizeof(*sizes),
619                    PVFS2_BUFMAP_GFP_FLAGS);
620    if (sizes == NULL)
621    {
622        kfree(new_iovec);
623        kfree(orig_iovec);
624        gossip_err("split_iovecs: Could not allocate memory for %lu bytes!\n",
625                (unsigned long)(max_new_nr_segs * sizeof(*sizes)));
626        return -ENOMEM;
627    }
628    /* copy the passed in iovec to a temp structure */
629    memcpy(orig_iovec, original_iovec, nr_segs * sizeof(*orig_iovec));
630    begin_seg = 0;
631repeat:
632    for (seg = begin_seg; seg < nr_segs; seg++)
633    {
634        if (tmpnew_nr_segs >= max_new_nr_segs || sizes_count >= max_new_nr_segs)
635        {
636            kfree(sizes);
637            kfree(orig_iovec);
638            kfree(new_iovec);
639            gossip_err("split_iovecs: exceeded the index limit (%lu)\n",
640                    tmpnew_nr_segs);
641            return -EINVAL;
642        }
643        if (count + orig_iovec[seg].iov_len < pvfs_bufmap_size_query())
644        {
645            count += orig_iovec[seg].iov_len;
646           
647            memcpy(&new_iovec[tmpnew_nr_segs], &orig_iovec[seg],
648                    sizeof(*new_iovec));
649            tmpnew_nr_segs++;
650            sizes[sizes_count]++;
651        }
652        else
653        {
654            new_iovec[tmpnew_nr_segs].iov_base = orig_iovec[seg].iov_base;
655            new_iovec[tmpnew_nr_segs].iov_len =
656                (pvfs_bufmap_size_query() - count);
657            tmpnew_nr_segs++;
658            sizes[sizes_count]++;
659            sizes_count++;
660            begin_seg = seg;
661            orig_iovec[seg].iov_base += (pvfs_bufmap_size_query() - count);
662            orig_iovec[seg].iov_len  -= (pvfs_bufmap_size_query() - count);
663            count = 0;
664            break;
665        }
666    }
667    if (seg != nr_segs) {
668        goto repeat;
669    }
670    else
671    {
672        sizes_count++;
673    }
674    *new_nr_segs = tmpnew_nr_segs;
675    /* new_iovec is freed by the caller */
676    *new_vec = new_iovec;
677    *seg_count = sizes_count;
678    /* seg_array is also freed by the caller */
679    *seg_array = sizes;
680    kfree(orig_iovec);
681    return 0;
682}
683
684static long bound_max_iovecs(const struct iovec *curr, unsigned long nr_segs, ssize_t *total_count)
685{
686    unsigned long i;
687    long max_nr_iovecs;
688    ssize_t total, count;
689
690    total = 0;
691    count = 0;
692    max_nr_iovecs = 0;
693    for (i = 0; i < nr_segs; i++)
694    {
695        const struct iovec *iv = &curr[i];
696        count += iv->iov_len;
697        if (unlikely((ssize_t)(count|iv->iov_len) < 0))
698            return -EINVAL;
699        if (total + iv->iov_len < pvfs_bufmap_size_query())
700        {
701            total += iv->iov_len;
702            max_nr_iovecs++;
703        }
704        else
705        {
706            total = (total + iv->iov_len - pvfs_bufmap_size_query());
707            max_nr_iovecs += (total / pvfs_bufmap_size_query() + 2);
708        }
709    }
710    *total_count = count;
711    return max_nr_iovecs;
712}
713
714#ifndef PVFS2_LINUX_KERNEL_2_4
715
716#ifdef HAVE_OBSOLETE_STRUCT_PAGE_COUNT_NO_UNDERSCORE
717#define pg_ref_count(pg) atomic_read(&(pg)->count)
718#else
719#define pg_ref_count(pg) atomic_read(&(pg)->_count)
720#endif
721
722/*
723 * Cleaning up pages in the cache involves dropping the reference count
724 * while cleaning up pages that were newly allocated involves unlocking
725 * the page after indicating if there was an error in the page.
726 */
727static void cleanup_cache_pages(unsigned long page_idx,
728                                struct rw_options *rw,
729                                int error)
730{
731    unsigned long j;
732
733    gossip_debug(GOSSIP_FILE_DEBUG, "cleaning up %ld memory pages\n", page_idx);
734    /* and pinned existing ones as well */
735    for (j = 0; j < page_idx; j++) {
736        if (rw->dest.pages.pages[j]) {
737            /* if the page was locked for I/O unlock it */
738            if (rw->dest.pages.pg_byte_map[j]) {
739                /* Mark if the page had errors */
740                if (error < 0) {
741                    gossip_lerr("Marking page %ld with error %d\n", j, error);
742                    SetPageError(rw->dest.pages.pages[j]);
743                }
744                /* or if it is indeed uptodate */
745                else {
746                    gossip_debug(GOSSIP_FILE_DEBUG, "Marking page %ld uptodate\n", j);
747                    SetPageUptodate(rw->dest.pages.pages[j]);
748                }
749                unlock_page(rw->dest.pages.pages[j]);
750            } else {
751                /* if it was already cached, decrement its use count */
752                page_cache_release(rw->dest.pages.pages[j]);
753            }
754            gossip_debug(GOSSIP_FILE_DEBUG, "Releasing page %p (refcount %d)\n",
755                    rw->dest.pages.pages[j], pg_ref_count(rw->dest.pages.pages[j]));
756        }
757    }
758    kfree(rw->dest.pages.pages);
759    rw->dest.pages.pages = NULL;
760    rw->dest.pages.nr_pages = 0;
761    kfree(rw->dest.pages.issue_pages);
762    rw->dest.pages.issue_pages = NULL;
763    kfree(rw->dest.pages.pg_byte_map);
764    rw->dest.pages.pg_byte_map = NULL;
765    rw->dest.pages.nr_issue_pages = 0;
766    return;
767}
768
769/* callback from read_cache_pages.
770 * What we are doing is aggregating all the pages in the cache
771 * on which I/O needs to be issued against.
772 * nr_issue_pages is a counter that keeps track of how many such
773 * pages are there and issue_pages is the array that keeps track
774 * of all the pointers to such pages.
775 * All such pages are locked until the I/O completes or an error
776 * happens.
777 */
778static int pvfs2_readpages_fill_cb(void *_data, struct page *page)
779{
780    struct rw_options *rw = (struct rw_options *) _data;
781
782    gossip_debug(GOSSIP_FILE_DEBUG, "nr_issue: %ld page %p\n",
783                 rw->dest.pages.nr_issue_pages, page);
784    rw->dest.pages.issue_pages[rw->dest.pages.nr_issue_pages++] = page;
785    return 0;
786}
787
788
789#if defined(HAVE_SPIN_LOCK_PAGE_ADDR_SPACE_STRUCT)
790#define lock_mapping_tree(mapping) spin_lock(&mapping->page_lock)
791#define unlock_mapping_tree(mapping) spin_unlock(&mapping->page_lock)
792#elif defined(HAVE_RW_LOCK_TREE_ADDR_SPACE_STRUCT)
793#define lock_mapping_tree(mapping) read_lock(&mapping->tree_lock)
794#define unlock_mapping_tree(mapping) read_unlock(&mapping->tree_lock)
795#elif defined(HAVE_SPIN_LOCK_TREE_ADDR_SPACE_STRUCT)
796#define lock_mapping_tree(mapping) spin_lock(&mapping->tree_lock)
797#define unlock_mapping_tree(mapping) spin_unlock(&mapping->tree_lock)
798#elif defined(HAVE_RT_PRIV_LOCK_ADDR_SPACE_STRUCT)
799#define lock_mapping_tree(mapping) spin_lock(&mapping->priv_lock)
800#define unlock_mapping_tree(mapping) spin_unlock(&mapping->priv_lock)
801#else
802#define lock_mapping_tree(mapping) read_lock_irq(&mapping->tree_lock)
803#define unlock_mapping_tree(mapping) read_unlock_irq(&mapping->tree_lock)
804#endif
805
806/* A debugging function to check the contents of a
807 *  mapping's address space/radix tree
808 */
809static int check_mapping_tree(struct address_space *mapping,
810                              size_t file_size) __attribute__((unused));
811static int check_mapping_tree(struct address_space *mapping,
812                              size_t file_size)
813{
814    unsigned long page_idx, begin_index, end_index, nr_to_read;
815
816    begin_index = 0;
817    end_index = (file_size - 1) >> PAGE_CACHE_SHIFT;
818    nr_to_read = end_index - begin_index + 1;
819    lock_mapping_tree(mapping);
820    for (page_idx = 0; page_idx < nr_to_read; page_idx++) {
821        struct page *page;
822        pgoff_t page_offset = begin_index + page_idx;
823
824        if (page_offset > end_index) {
825            break;
826        }
827        page = radix_tree_lookup(&mapping->page_tree, page_offset);
828        if (page) {
829            gossip_debug(GOSSIP_FILE_DEBUG, "check:(%ld) HIT page %p (refcount %d)"
830                                            "(page_offset %ld)\n",
831                                            page_idx, page,
832                                            pg_ref_count(page),
833                                            page_offset);
834        } else {
835            gossip_debug(GOSSIP_FILE_DEBUG, "check: (%ld) MISS (page_offset %ld)\n",
836                                            page_idx, page_offset);
837        }
838    }
839    unlock_mapping_tree(mapping);
840    return 0;
841}
842                           
843
844/* Locate the pages of the file blocks from the page-cache and
845 * store them in the rw_options control block.
846 * Note: if we don't locate, we allocate them.
847 * After that we increment their ref count so that we know for sure that
848 * they won't get swapped out.
849 */
850static int locate_file_pages(struct rw_options *rw, size_t total_size)
851{
852    struct address_space *mapping;
853    loff_t offset, isize;
854    unsigned long page_idx, begin_index, end_index, nr_to_read;
855    int ret = 0;
856    struct page *page;
857   
858    if (!rw ||  !rw->inode || !rw->off.io.offset ||
859        !rw->inode->i_mapping) {
860        gossip_lerr("invalid options\n");
861        return -EINVAL;
862    }
863    isize = pvfs2_i_size_read(rw->inode);
864    rw->copy_dest_type = COPY_DEST_PAGES;
865    /* start with an empty page list */
866    INIT_LIST_HEAD(&rw->dest.pages.page_list);
867    mapping = rw->inode->i_mapping;
868    offset = *(rw->off.io.offset);
869    /* Return if the file size was 0 */
870    if (isize == 0) {
871        rw->dest.pages.nr_pages = 0;
872        rw->dest.pages.pages = NULL;
873        rw->dest.pages.nr_issue_pages = 0;
874        rw->dest.pages.issue_pages = NULL;
875        return 0;
876    }
877    begin_index = offset >> PAGE_CACHE_SHIFT;
878    end_index = (unsigned long) (PVFS_util_min(isize - 1, (offset + total_size - 1))) >> PAGE_CACHE_SHIFT;
879    gossip_debug(GOSSIP_FILE_DEBUG, "filp: %p, inode: %p, mapping: %p\n",
880                                     rw->file, rw->inode, rw->inode->i_mapping);
881    gossip_debug(GOSSIP_FILE_DEBUG, "isize: %ld, offset (%ld) + total_size (%ld): %ld\n",
882                                     (long) isize,
883                                     (long) offset,
884                                     (long) total_size,
885                                     (long) offset + total_size);
886    gossip_debug(GOSSIP_FILE_DEBUG, "offset %lld, begin_index: %ld "
887                                    "end_index: %ld requested total_size: %zd\n",
888                                     offset, begin_index,
889                                     end_index, total_size);
890    nr_to_read = end_index - begin_index + 1;
891    rw->dest.pages.nr_pages = nr_to_read;
892    /* Allocate a byte map for all the pages */
893    rw->dest.pages.pg_byte_map = kzalloc(nr_to_read *
894                                         sizeof(*rw->dest.pages.pg_byte_map),
895                                         PVFS2_BUFMAP_GFP_FLAGS);
896    if (!rw->dest.pages.pg_byte_map) {
897        gossip_err("could not allocate memory\n");
898        return -ENOMEM;
899    }
900    /* and the array to hold the page pointers */
901    rw->dest.pages.pages = kzalloc(nr_to_read * sizeof(*rw->dest.pages.pages),
902                                   PVFS2_BUFMAP_GFP_FLAGS);
903    if (!rw->dest.pages.pages) {
904        gossip_err("could not allocate memory\n");
905        kfree(rw->dest.pages.pg_byte_map);
906        return -ENOMEM;
907    }
908    gossip_debug(GOSSIP_FILE_DEBUG, "read %ld pages\n",
909            nr_to_read);
910
911    lock_mapping_tree(mapping);
912    /* Preallocate all pages, increase their ref counts if they are in cache */
913    for (page_idx = 0; page_idx < nr_to_read; page_idx++) {
914        pgoff_t page_offset = begin_index + page_idx;
915
916        if (page_offset > end_index) {
917            break;
918        }
919        page = radix_tree_lookup(&mapping->page_tree, page_offset);
920        if (page) {
921            page_cache_get(page);
922            gossip_debug(GOSSIP_FILE_DEBUG, "(%ld) HIT page %p (refcount %d)"
923                                            "(page_offset %ld)\n",
924                                            page_idx, page,
925                                            pg_ref_count(page),
926                                            page_offset);
927            rw->dest.pages.pages[page_idx] = page;
928            g_pvfs2_stats.cache_hits++;
929            continue;
930        }
931        g_pvfs2_stats.cache_misses++;
932        unlock_mapping_tree(mapping);
933        /* Allocate, but don't add it to the LRU list yet */
934        page = page_cache_alloc_cold(mapping);
935        lock_mapping_tree(mapping);
936        if (!page) {
937            ret = -ENOMEM;
938            gossip_err("could not allocate page cache\n");
939            break;
940        }
941        page_cache_get(page);
942        gossip_debug(GOSSIP_FILE_DEBUG, "(%ld) MISS page %p (refcount %d)"
943                                        "(page_offset %ld)\n",
944                                        page_idx, page,
945                                        pg_ref_count(page),
946                                        page_offset);
947        page->index = page_offset;
948        /* Add it to our internal private list */
949        list_add(&page->lru, &rw->dest.pages.page_list);
950        rw->dest.pages.pages[page_idx] = page;
951        /* mark in the byte map */
952        rw->dest.pages.pg_byte_map[page_idx] = 1;
953        ret++;
954    }
955    unlock_mapping_tree(mapping);
956    /* cleanup in case of error */
957    if (ret < 0) {
958        gossip_err("could not page_cache_alloc_cold\n");
959        goto cleanup;
960    }
961    rw->dest.pages.nr_issue_pages = 0;
962    /* if there is any need to issue I/O */
963    if (ret > 0)
964    {
965        /* Allocate memory for the pages against which I/O needs to be issued */
966        rw->dest.pages.issue_pages = kzalloc(ret *
967                                             sizeof(*rw->dest.pages.issue_pages),
968                                             PVFS2_BUFMAP_GFP_FLAGS);
969        if (!rw->dest.pages.issue_pages) {
970            gossip_err("could not allocate memory for issue_pages\n");
971            ret = -ENOMEM;
972            goto cleanup;
973        }
974        gossip_debug(GOSSIP_FILE_DEBUG, "issue %d I/O\n", ret);
975        /* read_cache_pages can now be called on the list of pages */
976        read_cache_pages(mapping, &rw->dest.pages.page_list,
977                               pvfs2_readpages_fill_cb, rw);
978        BUG_ON(!list_empty(&rw->dest.pages.page_list));
979        /*
980         * A failed read_cache_pages will be
981         * indicated if
982         * rw->dest.pages.nr_issues_pages != ret
983         */
984        if (rw->dest.pages.nr_issue_pages != ret) {
985            gossip_err("read_cache_pages failed (%ld != %d)\n",
986                 rw->dest.pages.nr_issue_pages, ret);
987            ret = -ENOMEM;
988            goto cleanup;
989        }
990    }
991out:
992    return ret;
993cleanup:
994    /* cleanup any of the allocated pagecache pages */
995    cleanup_cache_pages(page_idx, rw, ret);
996    goto out;
997}
998
999/*
1000 * Given an array of pages and a count of such pages, this function
1001 * returns
1002 * an error if the parameters/pages are invalid/similar
1003 * 0 if the pages are not contiguous on the file
1004 * 1 if the pages are contiguous on file
1005 */
1006static int are_contiguous(int nr_pages, struct page **page_array)
1007{
1008    int i;
1009    pgoff_t fpoffset;
1010    if (!page_array || nr_pages <= 0) {
1011        gossip_err("Bogus parameters %d, page_array: %p\n", nr_pages, page_array);
1012        return -EINVAL;
1013    }
1014    if (!page_array[0]) {
1015        gossip_err("Bogus parameters %p\n", page_array[0]);
1016        return -EINVAL;
1017    }
1018    fpoffset = page_array[0]->index;
1019    for (i = 1; i < nr_pages; i++) {
1020        if (!page_array[i]) {
1021            return -EINVAL;
1022        }
1023        if (page_array[i]->index == fpoffset) {
1024            gossip_err("2 pages have the same file offset (index 0 and %d)\n",
1025                    i);
1026            return -EINVAL;
1027        }
1028        /* not contiguous on file */
1029        if (page_array[i]->index != fpoffset + i) {
1030            gossip_debug(GOSSIP_FILE_DEBUG, "offset at index %d is non-contiguous\n", i);
1031            return 0;
1032        }
1033    }
1034    /* Cool. they are all contiguous */
1035    return 1;
1036}
1037
1038/* Issue any I/O for regions not found in the cache
1039 * NOTE: Try to be smart about whether to issue non-contiguous I/O
1040 * or contiguous I/O.
1041 */
1042static ssize_t wait_for_missing_io(struct rw_options *rw)
1043{
1044    ssize_t err = 0;
1045
1046    if (rw->dest.pages.nr_issue_pages) {
1047        int contig_on_file = 0;
1048
1049        gossip_debug(GOSSIP_FILE_DEBUG, "Number of pages for I/O issue %ld,"
1050                                        " total_size: %ld\n",
1051                rw->dest.pages.nr_issue_pages
1052              , (rw->dest.pages.nr_issue_pages << PAGE_CACHE_SHIFT));
1053        /* scan through the issue pages array and see if we can submit a direct
1054         * contiguous request first.
1055         */
1056        contig_on_file = are_contiguous(rw->dest.pages.nr_issue_pages,
1057                rw->dest.pages.issue_pages);
1058        /* Any errors? */
1059        if (contig_on_file < 0) {
1060            err = contig_on_file;
1061            goto out;
1062        }
1063        /* contiguous or non-contiguous on file */
1064        else {
1065            struct iovec *uncached_vec = NULL;
1066            struct xtvec *uncached_xtvec = NULL;
1067            int i;
1068            size_t total_requested_io;
1069
1070            total_requested_io = (rw->dest.pages.nr_issue_pages << PAGE_CACHE_SHIFT);
1071            uncached_vec = kzalloc(rw->dest.pages.nr_issue_pages *
1072                                   sizeof(*uncached_vec), PVFS2_BUFMAP_GFP_FLAGS);
1073            if (!uncached_vec) {
1074                gossip_err("out of memory allocating uncached_vec\n");
1075                err = -ENOMEM;
1076                goto out;
1077            }
1078            if (!contig_on_file)
1079            {
1080                uncached_xtvec = kzalloc(rw->dest.pages.nr_issue_pages *
1081                                         sizeof(*uncached_xtvec), PVFS2_BUFMAP_GFP_FLAGS);
1082                if (!uncached_xtvec) {
1083                    gossip_err("out of memory allocating uncached_xtvec\n");
1084                    kfree(uncached_vec);
1085                    err = -ENOMEM;
1086                    goto out;
1087                }
1088            }
1089            for (i = 0; i < rw->dest.pages.nr_issue_pages; i++) {
1090                uncached_vec[i].iov_base = rw->dest.pages.issue_pages[i];
1091                uncached_vec[i].iov_len = PAGE_CACHE_SIZE;
1092#if 0
1093                gossip_debug(GOSSIP_FILE_DEBUG, "ISSUE: (%d) "
1094                        "iov_base: %p, iov_len: %zd \n",
1095                        i, uncached_vec[i].iov_base,
1096                        uncached_vec[i].iov_len);
1097#endif
1098                if (!contig_on_file)
1099                {
1100                    uncached_xtvec[i].xtv_off =
1101                        (rw->dest.pages.issue_pages[i]->index << PAGE_CACHE_SHIFT);
1102                    uncached_xtvec[i].xtv_len = PAGE_CACHE_SIZE;
1103                    gossip_debug(GOSSIP_FILE_DEBUG,
1104                            "(%d) xtv_off = %zd, xtv_len = %zd\n",
1105                            i, (size_t) uncached_xtvec[i].xtv_off,
1106                            uncached_xtvec[i].xtv_len);
1107                }
1108            }
1109            /* if all page cache pages are contiguous on file */
1110            if (contig_on_file) {
1111                /* issue a simple direct contiguous I/O call */
1112                err = wait_for_direct_io(rw,
1113                                         uncached_vec,
1114                                         rw->dest.pages.nr_issue_pages,
1115                                         total_requested_io);
1116            }
1117            else {
1118                /* else issue a complicated non-contig I/O call */
1119                err = wait_for_iox(rw,
1120                                   uncached_vec,
1121                                   rw->dest.pages.nr_issue_pages,
1122                                   uncached_xtvec,
1123                                   rw->dest.pages.nr_issue_pages,
1124                                   total_requested_io);
1125                kfree(uncached_xtvec);
1126            }
1127            kfree(uncached_vec);
1128            if (err < 0) {
1129                gossip_err("failed with error %zd\n",
1130                        (size_t) err);
1131                goto out;
1132            }
1133            gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_missing_io: "
1134                    "transferred %zd, requested %zd\n",
1135                     (size_t) err, total_requested_io);
1136        }
1137    }
1138out:
1139    return err;
1140}
1141
1142/*
1143 * NOTE: Currently only immutable files pass their I/O
1144 * through the cache.
1145 * Preparation for cached I/O requires that we locate all the file block
1146 * in the page-cache and stashing those pointers.
1147 * Returns the actual size of completed I/O.
1148 */
1149static ssize_t wait_for_cached_io(struct rw_options *old_rw, struct iovec *vec,
1150        int nr_segs, size_t total_size)
1151{
1152    ssize_t err = 0, total_actual_io = 0;
1153    ssize_t ret = 0;
1154    struct rw_options rw;
1155    loff_t isize, offset;
1156
1157    memcpy(&rw, old_rw, sizeof(rw));
1158    if (rw.type != IO_READV) {
1159        gossip_err("writes are not handled yet!\n");
1160        return -EOPNOTSUPP;
1161    }
1162    offset = *(rw.off.io.offset);
1163    isize = pvfs2_i_size_read(rw.inode);
1164    /* If our file offset was greater than file size, we should return 0 */
1165    if (offset >= isize) {
1166        return 0;
1167    }
1168    /* (Al)locate all the pages in the pagecache first */
1169    if ((err = locate_file_pages(&rw, total_size)) < 0) {
1170        gossip_err("error in locating pages %ld\n", (long) err);
1171        return err;
1172    }
1173    gossip_debug(GOSSIP_FILE_DEBUG, "total_size %zd, total # of pages %ld\n",
1174            total_size, rw.dest.pages.nr_pages);
1175    /* Issue and wait for I/O only for pages that are not uptodate
1176     * or are not found in the cache
1177     */
1178    if ((ret = wait_for_missing_io(&rw)) < 0) {
1179       gossip_err("wait_for_missing_io: error in waiting for missing I/O %ld\n"
1180                 ,(long)err);
1181        goto cleanup;
1182    }
1183    /* return value is basically file size minus current file offset */
1184    //total_actual_io = isize - offset;
1185
1186    /* number of bytes to retrieve from the pagecache should be based on
1187     * the number of bytes returned from wait_for_missing_io, which executes
1188     * the io call with the number of bytes requested and returns the number
1189     * of bytes actually transferred.
1190    */
1191    total_actual_io = ret;
1192
1193    gossip_debug(GOSSIP_FILE_DEBUG, "total_actual_io to be staged from "
1194                                    "page-cache %zd\n", total_actual_io);
1195    /* Copy the data from the page-cache to the application's address space */
1196    err = copy_from_pagecache(&rw, vec, nr_segs, total_actual_io);
1197    err = 0;
1198cleanup:
1199    cleanup_cache_pages(rw.dest.pages.nr_pages, &rw, err);
1200    return err == 0 ? total_actual_io : err;
1201}
1202#endif //#ifndef PVFS2_LINUX_KERNEL_2_4
1203
1204/*
1205 * Common entry point for read/write/readv/writev
1206 * This function will dispatch it to either the direct I/O
1207 * or buffered I/O path depending on the mount options and/or
1208 * augmented/extended metadata attached to the file.
1209 * Note: File extended attributes override any mount options.
1210 */
1211static ssize_t do_readv_writev(struct rw_options *rw)
1212{
1213    ssize_t ret, total_count;
1214    struct inode *inode = NULL;
1215    pvfs2_inode_t *pvfs2_inode = NULL;
1216    struct file *file;
1217    unsigned int to_free;
1218    size_t count;
1219    const struct iovec *iov;
1220    unsigned long nr_segs, seg, new_nr_segs = 0;
1221    unsigned long max_new_nr_segs = 0;
1222    unsigned long  seg_count = 0;
1223    unsigned long *seg_array = NULL;
1224    struct iovec *iovecptr = NULL, *ptr = NULL;
1225    loff_t *offset;
1226
1227    total_count = 0;
1228    ret = -EINVAL;
1229    file = NULL;
1230    inode = NULL;
1231    count =  0;
1232    to_free = 0;
1233    if (!rw || !rw->fnstr)
1234    {
1235        gossip_lerr("Invalid parameters\n");
1236        goto out;
1237    }
1238    offset = rw->off.io.offset;
1239    if (!offset)
1240    {
1241        gossip_err("%s: Invalid offset\n", rw->fnstr);
1242        goto out;
1243    }
1244    inode = rw->inode;
1245    if (!inode)
1246    {
1247        gossip_err("%s: Invalid inode\n", rw->fnstr);
1248        goto out;
1249    }
1250    pvfs2_inode = rw->pvfs2_inode;
1251    if (!pvfs2_inode)
1252    {
1253        gossip_err("%s: Invalid pvfs2 inode\n", rw->fnstr);
1254        goto out;
1255    }
1256    file  = rw->file;
1257    iov = rw->dest.address.iov;
1258    nr_segs = rw->dest.address.nr_segs;
1259    if (iov == NULL || nr_segs < 0)
1260    {
1261        gossip_err("%s: Invalid iovec %p or nr_segs %lu\n",
1262                rw->fnstr, iov, nr_segs);
1263        goto out;
1264    }
1265    /* Compute total and max number of segments after split */
1266    if ((max_new_nr_segs = bound_max_iovecs(iov, nr_segs, &count)) < 0)
1267    {
1268        gossip_lerr("%s: could not bound iovec %lu\n", rw->fnstr
1269                                                     , max_new_nr_segs);
1270        goto out;
1271    }
1272    if (rw->type == IO_WRITEV)
1273    {
1274        if (!file)
1275        {
1276            gossip_err("%s: Invalid file pointer\n", rw->fnstr);
1277            goto out;
1278        }
1279        if (file->f_pos > pvfs2_i_size_read(inode))
1280        {
1281            pvfs2_i_size_write(inode, file->f_pos);
1282        }
1283        /* perform generic linux kernel tests for sanity of write
1284         * arguments
1285         */
1286#ifdef PVFS2_LINUX_KERNEL_2_4
1287        ret = pvfs2_precheck_file_write(file, inode, &count, offset);
1288#else
1289        ret = generic_write_checks(file, offset, &count, S_ISBLK(inode->i_mode));
1290#endif
1291        if (ret != 0)
1292        {
1293            gossip_err("%s: failed generic argument checks.\n", rw->fnstr);
1294            goto out;
1295        }
1296        gossip_debug(GOSSIP_FILE_DEBUG, "%s: proceeding with offset : %llu, "
1297                                        "size %zd\n",
1298                                        rw->fnstr, llu(*offset), count);
1299    }
1300    if (count == 0)
1301    {
1302        ret = 0;
1303        goto out;
1304    }
1305
1306    rw->count = count;
1307    /*
1308     * if the total size of data transfer requested is greater than
1309     * the kernel-set blocksize of PVFS2, then we split the iovecs
1310     * such that no iovec description straddles a block size limit
1311     */
1312    if (count > pvfs_bufmap_size_query())
1313    {
1314        /*
1315         * Split up the given iovec description such that
1316         * no iovec descriptor straddles over the block-size limitation.
1317         * This makes us our job easier to stage the I/O.
1318         * In addition, this function will also compute an array with seg_count
1319         * entries that will store the number of segments that straddle the
1320         * block-size boundaries.
1321         */
1322        ret = split_iovecs(max_new_nr_segs, /* IN */
1323                           nr_segs,         /* IN */
1324                           iov,             /* IN */
1325                           &new_nr_segs,    /* OUT */
1326                           &iovecptr,       /* OUT */
1327                           &seg_count,      /* OUT */
1328                           &seg_array);     /* OUT */
1329        if(ret < 0)
1330        {
1331            gossip_err("%s: Failed to split iovecs to satisfy larger "
1332                       " than blocksize readv/writev request %zd\n", rw->fnstr
1333                                                                   , ret);
1334            goto out;
1335        }
1336        gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting iovecs from %lu to %lu"
1337                                        " [max_new %lu]\n",
1338                rw->fnstr, nr_segs, new_nr_segs, max_new_nr_segs);
1339        /* We must free seg_array and iovecptr */
1340        to_free = 1;
1341    }
1342    else
1343    {
1344        new_nr_segs = nr_segs;
1345        /* use the given iovec description */
1346        iovecptr = (struct iovec *) iov;
1347        /* There is only 1 element in the seg_array */
1348        seg_count = 1;
1349        /* and its value is the number of segments passed in */
1350        seg_array = &nr_segs;
1351        /* We dont have to free up anything */
1352        to_free = 0;
1353    }
1354    ptr = iovecptr;
1355
1356    gossip_debug(GOSSIP_FILE_DEBUG, "%s %zd@%llu\n",
1357            rw->fnstr, count, llu(*offset));
1358    gossip_debug(GOSSIP_FILE_DEBUG, "%s: new_nr_segs: %lu, seg_count: %lu\n",
1359            rw->fnstr, new_nr_segs, seg_count);
1360#ifdef PVFS2_KERNEL_DEBUG
1361    for (seg = 0; seg < new_nr_segs; seg++)
1362    {
1363        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %p to %p [%d bytes]\n",
1364                rw->fnstr,
1365                (int)seg + 1, iovecptr[seg].iov_base,
1366                iovecptr[seg].iov_base + iovecptr[seg].iov_len,
1367                (int) iovecptr[seg].iov_len);
1368    }
1369    for (seg = 0; seg < seg_count; seg++)
1370    {
1371        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %zd) %lu\n",
1372                rw->fnstr, seg + 1, seg_array[seg]);
1373   }
1374#endif
1375    seg = 0;   
1376    while (total_count < count)
1377    {
1378        size_t each_count, amt_complete;
1379
1380        /* how much to transfer in this loop iteration */
1381        each_count = (((count - total_count) > pvfs_bufmap_size_query()) ?
1382                      pvfs_bufmap_size_query() : (count - total_count));
1383#ifndef PVFS2_LINUX_KERNEL_2_4
1384        /* caching is not working properly. removing functionality for now.  Becky Ligon. */
1385        /* caching REQUIRES the user's buffer to be a multiple of 4096; the code breaks if */
1386        /* it is not!                                                                      */
1387
1388        /* if a file is immutable, stage its I/O
1389         * through the cache */
1390        //if (IS_IMMUTABLE(rw->inode)) {
1391            /* Stage the I/O through the kernel's pagecache */
1392        //    ret = wait_for_cached_io(rw, ptr, seg_array[seg], each_count);
1393        //}
1394        //else
1395#endif /* PVFS2_LINUX_KERNEL_2_4 */
1396        //{
1397            /* push the I/O directly through to storage */
1398     ret = wait_for_direct_io(rw, ptr, seg_array[seg], each_count);
1399        //}
1400        if (ret < 0)
1401        {
1402            goto out;
1403        }
1404        /* advance the iovec pointer */
1405        ptr += seg_array[seg];
1406        seg++;
1407        *offset += ret;
1408        total_count += ret;
1409        amt_complete = ret;
1410
1411        /* if we got a short I/O operations,
1412         * fall out and return what we got so far
1413         */
1414        if (amt_complete < each_count)
1415        {
1416            break;
1417        }
1418    }
1419    if (total_count > 0)
1420    {
1421        ret = total_count;
1422    }
1423out:
1424    if (to_free)
1425    {
1426        kfree(iovecptr);
1427        kfree(seg_array);
1428    }
1429    if (ret > 0 && inode != NULL && pvfs2_inode != NULL)
1430    {
1431        if (rw->type == IO_READV)
1432        {
1433            SetAtimeFlag(pvfs2_inode);
1434            inode->i_atime = CURRENT_TIME;
1435        }
1436        else
1437        {
1438            SetMtimeFlag(pvfs2_inode);
1439            inode->i_mtime = CURRENT_TIME;
1440        }
1441        mark_inode_dirty_sync(inode);
1442    }
1443    return ret;
1444}
1445
1446/** Read data from a specified offset in a file (referenced by inode).
1447 *  Data may be placed either in a user or kernel buffer.
1448 */
1449ssize_t pvfs2_inode_read(
1450    struct inode *inode,
1451    char __user *buf,
1452    size_t count,
1453    loff_t *offset,
1454    int copy_to_user,
1455    loff_t readahead_size)
1456{
1457    struct rw_options rw;
1458    struct iovec vec;
1459
1460    memset(&rw, 0, sizeof(rw));
1461    rw.async = 0;
1462    rw.type = IO_READ;
1463    rw.copy_dest_type = COPY_DEST_ADDRESSES;
1464    rw.readahead_size = readahead_size;
1465    rw.copy_to_user_addresses = copy_to_user;
1466    rw.fnstr = __FUNCTION__;
1467    vec.iov_base = buf;
1468    vec.iov_len  = count;
1469    rw.inode = inode;
1470    rw.pvfs2_inode = PVFS2_I(inode);
1471    rw.file = NULL;
1472    rw.dest.address.iov = &vec;
1473    rw.dest.address.nr_segs = 1;
1474    rw.off.io.offset = offset;
1475    g_pvfs2_stats.reads++;
1476    return do_readv_writev(&rw);
1477}
1478
1479/** Read data from a specified offset in a file into a user buffer.
1480 */
1481ssize_t pvfs2_file_read(
1482    struct file *file,
1483    char __user *buf,
1484    size_t count,
1485    loff_t *offset)
1486{
1487    struct rw_options rw;
1488    struct iovec vec;
1489
1490    gossip_debug(GOSSIP_IO_DEBUG,"pvfs2_file_read: count=%zd \toffset=%lld\n"
1491               ,count
1492               ,(long long)*offset);
1493
1494
1495    memset(&rw, 0, sizeof(rw));
1496    rw.async = 0;
1497    rw.type = IO_READ;
1498    rw.copy_dest_type = COPY_DEST_ADDRESSES;
1499    rw.copy_to_user_addresses = 1;
1500    rw.fnstr = __FUNCTION__;
1501    vec.iov_base = buf;
1502    vec.iov_len = count;
1503    rw.inode = file->f_dentry->d_inode;
1504    rw.pvfs2_inode = PVFS2_I(rw.inode);
1505    rw.file = file;
1506    rw.dest.address.iov = &vec;
1507    rw.dest.address.nr_segs = 1;
1508    rw.off.io.offset = offset;
1509
1510    rw.readahead_size = 0;
1511    g_pvfs2_stats.reads++;
1512
1513    return do_readv_writev(&rw);
1514}
1515
1516/** Write data from a contiguous user buffer into a file at a specified
1517 *  offset.
1518 */
1519static ssize_t pvfs2_file_write(
1520    struct file *file,
1521    const char __user *buf,
1522    size_t count,
1523    loff_t *offset)
1524{
1525    struct rw_options rw;
1526    struct iovec vec;
1527
1528    memset(&rw, 0, sizeof(rw));
1529    rw.async = 0;
1530    rw.type = IO_WRITE;
1531    rw.copy_dest_type = COPY_DEST_ADDRESSES;
1532    rw.readahead_size = 0;
1533    rw.copy_to_user_addresses = 1;
1534    rw.fnstr = __FUNCTION__;
1535    vec.iov_base  = (char *) buf;
1536    vec.iov_len   = count;
1537    rw.file = file;
1538    rw.inode = file->f_dentry->d_inode;
1539    rw.pvfs2_inode = PVFS2_I(rw.inode);
1540    rw.dest.address.iov = &vec;
1541    rw.dest.address.nr_segs = 1;
1542    rw.off.io.offset = offset;
1543    g_pvfs2_stats.writes++;
1544    return do_readv_writev(&rw);
1545}
1546
1547/* compat code, < 2.6.19 */
1548#ifndef HAVE_COMBINED_AIO_AND_VECTOR
1549/** Reads data to several contiguous user buffers (an iovec) from a file at a
1550 * specified offset.
1551 */
1552static ssize_t pvfs2_file_readv(
1553    struct file *file,
1554    const struct iovec *iov,
1555    unsigned long nr_segs,
1556    loff_t *offset)
1557{
1558    struct rw_options rw;
1559
1560    memset(&rw, 0, sizeof(rw));
1561    rw.async = 0;
1562    rw.type = IO_READV;
1563    rw.copy_dest_type = COPY_DEST_ADDRESSES;
1564    rw.copy_to_user_addresses = 1;
1565    rw.fnstr = __FUNCTION__;
1566    rw.inode = file->f_dentry->d_inode;
1567    rw.pvfs2_inode = PVFS2_I(rw.inode);
1568    rw.file  = file;
1569    rw.dest.address.iov = (struct iovec *) iov;
1570    rw.dest.address.nr_segs = nr_segs;
1571    rw.off.io.offset = offset;
1572    rw.readahead_size = 0;
1573    g_pvfs2_stats.reads++;
1574    return do_readv_writev(&rw);
1575}
1576
1577/** Write data from a several contiguous user buffers (an iovec) into a file at
1578 * a specified offset.
1579 */
1580static ssize_t pvfs2_file_writev(
1581    struct file *file,
1582    const struct iovec *iov,
1583    unsigned long nr_segs,
1584    loff_t *offset)
1585{
1586    struct rw_options rw;
1587
1588    memset(&rw, 0, sizeof(rw));
1589    rw.async = 0;
1590    rw.type = IO_WRITEV;
1591    rw.copy_dest_type = COPY_DEST_ADDRESSES;
1592    rw.readahead_size = 0;
1593    rw.copy_to_user_addresses = 1;
1594    rw.fnstr = __FUNCTION__;
1595    rw.file = file;
1596    rw.inode = file->f_dentry->d_inode;
1597    rw.pvfs2_inode = PVFS2_I(rw.inode);
1598    rw.dest.address.iov = (struct iovec *) iov;
1599    rw.dest.address.nr_segs = nr_segs;
1600    rw.off.io.offset = offset;
1601
1602    g_pvfs2_stats.writes++;
1603    return do_readv_writev(&rw);
1604}
1605#endif
1606
1607
1608/* Construct a trailer of <file offsets, length pairs> in a buffer that we
1609 * pass in as an upcall trailer to client-core. This is used by clientcore
1610 * to construct a Request_hindexed type to stage the non-contiguous I/O
1611 * to file
1612 */
1613static int construct_file_offset_trailer(char **trailer,
1614        PVFS_size *trailer_size, int seg_count, struct xtvec *xptr)
1615{
1616    int i;
1617    struct read_write_x *rwx;
1618
1619    *trailer_size = seg_count * sizeof(*rwx);
1620    *trailer = (char *) vmalloc(*trailer_size);
1621    if (*trailer == NULL)
1622    {
1623        *trailer_size = 0;
1624        return -ENOMEM;
1625    }
1626    rwx = (struct read_write_x *) *trailer;
1627    for (i = 0; i < seg_count; i++)
1628    {
1629        rwx->off = xptr[i].xtv_off;
1630        rwx->len = xptr[i].xtv_len;
1631        rwx++;
1632    }
1633    return 0;
1634}
1635
1636/*
1637 * The reason we need to do this is to be able to support readx() and writex()
1638 * of larger than (pvfs_bufmap_size_query())
1639 * (default is PVFS2_BUFMAP_DEFAULT_DESC_SIZE MB).
1640 * What that means is that
1641 * we will create a new xtvec descriptor for those file offsets that
1642 * go beyond the limit
1643 * Return value for this routine is -ve in case of errors
1644 * and 0 in case of success.
1645 * Further, the new_nr_segs pointer is updated to hold the new value
1646 * of number of xtvecs, the new_xtvec pointer is updated to hold the pointer
1647 * to the new split xtvec, and the size array is an array of integers holding
1648 * the number of xtvecs that straddle (pvfs_bufmap_size_query()).
1649 * The max_new_nr_segs value is computed by the caller and passed in.
1650 * (It will be (count of all xtv_len/ block_size) + 1).
1651 */
1652static int split_xtvecs(
1653                unsigned long max_new_nr_segs,      /* IN */
1654                unsigned long nr_segs,              /* IN */
1655                const struct xtvec *original_xtvec, /* IN */
1656                unsigned long *new_nr_segs,         /* OUT */
1657                struct xtvec **new_vec,             /* OUT */
1658                unsigned long *seg_count,           /* OUT */
1659                unsigned long **seg_array)          /* OUT */
1660{
1661    unsigned long seg, count, begin_seg, tmpnew_nr_segs;
1662    struct xtvec *new_xtvec = NULL, *orig_xtvec;
1663    unsigned long *sizes = NULL, sizes_count = 0;
1664
1665    if (nr_segs <= 0 || original_xtvec == NULL
1666            || new_nr_segs == NULL || new_vec == NULL
1667            || seg_count == NULL || seg_array == NULL || max_new_nr_segs <= 0)
1668    {
1669        gossip_err("Invalid parameters to split_xtvecs\n");
1670        return -EINVAL;
1671    }
1672    *new_nr_segs = 0;
1673    *new_vec = NULL;
1674    *seg_count = 0;
1675    *seg_array = NULL;
1676    /* copy the passed in xtvec descriptor to a temp structure */
1677    orig_xtvec = kmalloc(nr_segs * sizeof(*orig_xtvec), PVFS2_BUFMAP_GFP_FLAGS);
1678    if (orig_xtvec == NULL)
1679    {
1680        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n",
1681                (unsigned long)(nr_segs * sizeof(*orig_xtvec)));
1682        return -ENOMEM;
1683    }
1684    new_xtvec = kzalloc(max_new_nr_segs * sizeof(*new_xtvec),
1685            PVFS2_BUFMAP_GFP_FLAGS);
1686    if (new_xtvec == NULL)
1687    {
1688        kfree(orig_xtvec);
1689        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n",
1690                (unsigned long)(max_new_nr_segs * sizeof(*new_xtvec)));
1691        return -ENOMEM;
1692    }
1693    sizes = kzalloc(max_new_nr_segs * sizeof(*sizes), PVFS2_BUFMAP_GFP_FLAGS);
1694    if (sizes == NULL)
1695    {
1696        kfree(new_xtvec);
1697        kfree(orig_xtvec);
1698        gossip_err("split_xtvecs: Could not allocate memory for %lu bytes!\n",
1699                (unsigned long)(max_new_nr_segs * sizeof(*sizes)));
1700        return -ENOMEM;
1701    }
1702    /* copy the passed in xtvec to a temp structure */
1703    memcpy(orig_xtvec, original_xtvec, nr_segs * sizeof(*orig_xtvec));
1704    begin_seg = 0;
1705    count = 0;
1706    tmpnew_nr_segs = 0;
1707repeat:
1708    for (seg = begin_seg; seg < nr_segs; seg++)
1709    {
1710        if (tmpnew_nr_segs >= max_new_nr_segs || sizes_count >= max_new_nr_segs)
1711        {
1712            kfree(sizes);
1713            kfree(orig_xtvec);
1714            kfree(new_xtvec);
1715            gossip_err("split_xtvecs: exceeded the index limit (%lu)\n",
1716                            tmpnew_nr_segs);
1717            return -EINVAL;
1718        }
1719        if (count + orig_xtvec[seg].xtv_len < pvfs_bufmap_size_query())
1720        {
1721            count += orig_xtvec[seg].xtv_len;
1722           
1723            memcpy(&new_xtvec[tmpnew_nr_segs], &orig_xtvec[seg],
1724                    sizeof(*new_xtvec));
1725            tmpnew_nr_segs++;
1726            sizes[sizes_count]++;
1727        }
1728        else
1729        {
1730            new_xtvec[tmpnew_nr_segs].xtv_off = orig_xtvec[seg].xtv_off;
1731            new_xtvec[tmpnew_nr_segs].xtv_len =
1732                (pvfs_bufmap_size_query() - count);
1733            tmpnew_nr_segs++;
1734            sizes[sizes_count]++;
1735            sizes_count++;
1736            begin_seg = seg;
1737            orig_xtvec[seg].xtv_off += (pvfs_bufmap_size_query() - count);
1738            orig_xtvec[seg].xtv_len -= (pvfs_bufmap_size_query() - count);
1739            count = 0;
1740            break;
1741        }
1742    }
1743    if (seg != nr_segs) {
1744        goto repeat;
1745    }
1746    else
1747    {
1748        sizes_count++;
1749    }
1750    *new_nr_segs = tmpnew_nr_segs;
1751    /* new_xtvec is freed by the caller */
1752    *new_vec = new_xtvec;
1753    *seg_count = sizes_count;
1754    /* seg_array is also freed by the caller */
1755    *seg_array = sizes;
1756    kfree(orig_xtvec);
1757    return 0;
1758}
1759
1760static long
1761bound_max_xtvecs(const struct xtvec *curr, unsigned long nr_segs, size_t *total_count)
1762{
1763    unsigned long i;
1764    long max_nr_xtvecs;
1765    size_t total, count;
1766
1767    total = 0;
1768    count = 0;
1769    max_nr_xtvecs = 0;
1770    for (i = 0; i < nr_segs; i++)
1771    {
1772        const struct xtvec *xv = &curr[i];
1773        count += xv->xtv_len;
1774        if (unlikely((ssize_t)(count|xv->xtv_len) < 0))
1775            return -EINVAL;
1776        if (total + xv->xtv_len < pvfs_bufmap_size_query())
1777        {
1778            total += xv->xtv_len;
1779            max_nr_xtvecs++;
1780        }
1781        else
1782        {
1783            total = (total + xv->xtv_len - pvfs_bufmap_size_query());
1784            max_nr_xtvecs += (total / pvfs_bufmap_size_query() + 2);
1785        }
1786    }
1787    *total_count = count;
1788    return max_nr_xtvecs;
1789}
1790
1791/*
1792 * Post and wait for the I/O upcall to finish.
1793 * @rw  - contains state information to initiate the I/O operation
1794 * @vec - contains the memory regions
1795 * @nr_segs - number of memory vector regions
1796 * @xtvec - contains the file regions
1797 * @xtnr_segs - number of file vector regions
1798 */
1799static ssize_t wait_for_iox(struct rw_options *rw,
1800                            struct iovec *vec,
1801                            unsigned long nr_segs,
1802                            struct xtvec *xtvec,
1803                            unsigned long xtnr_segs,
1804                            size_t total_size)
1805{
1806    pvfs2_kernel_op_t *new_op = NULL;
1807    int buffer_index = -1;
1808    ssize_t ret;
1809
1810    if (!rw || !vec || nr_segs < 0 || total_size <= 0
1811            || !xtvec || xtnr_segs < 0)
1812    {
1813        gossip_lerr("invalid parameters (rw: %p, vec: %p, nr_segs: %lu, "
1814                "xtvec %p, xtnr_segs %lu, total_size: %zd\n", rw, vec, nr_segs,
1815                xtvec, xtnr_segs, total_size);
1816        ret = -EINVAL;
1817        goto out;
1818    }
1819    if (!rw->pvfs2_inode || !rw->inode || !rw->fnstr)
1820    {
1821        gossip_lerr("invalid parameters (pvfs2_inode: %p, inode: %p, fnstr: %p\n",
1822                rw->pvfs2_inode, rw->inode, rw->fnstr);
1823        ret = -EINVAL;
1824        goto out;
1825    }
1826    new_op = op_alloc_trailer(PVFS2_VFS_OP_FILE_IOX);
1827    if (!new_op)
1828    {
1829        ret = -ENOMEM;
1830        goto out;
1831    }
1832    new_op->upcall.req.iox.io_type =
1833        (rw->type == IO_READX) ? PVFS_IO_READ : PVFS_IO_WRITE;
1834    new_op->upcall.req.iox.refn = rw->pvfs2_inode->refn;
1835
1836    /* get a shared buffer index */
1837    ret = pvfs_bufmap_get(&buffer_index);
1838    if (ret < 0)
1839    {
1840        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get() "
1841                    "failure (%ld)\n", rw->fnstr, (long) ret);
1842        goto out;
1843    }
1844    new_op->upcall.req.iox.buf_index = buffer_index;
1845    new_op->upcall.req.iox.count     = total_size;
1846    /* construct the upcall trailer buffer */
1847    if ((ret = construct_file_offset_trailer(&new_op->upcall.trailer_buf,
1848                    &new_op->upcall.trailer_size, xtnr_segs, xtvec)) < 0)
1849    {
1850        gossip_err("%s: construct_file_offset_trailer "
1851                "failure (%ld)\n", rw->fnstr, (long) ret);
1852        goto out;
1853    }
1854    gossip_debug(GOSSIP_FILE_DEBUG, "%s: copy_to_user %d nr_segs %lu, "
1855            "xtnr_segs: %lu "
1856            "total_size: %zd "
1857            "copy_dst_type %d\n",
1858            rw->fnstr, rw->copy_to_user_addresses,
1859            nr_segs, xtnr_segs,
1860            total_size, rw->copy_dest_type);
1861
1862    /* Stage 1: Copy in buffers */
1863    if ((ret = precopy_buffers(buffer_index, rw, vec, nr_segs, total_size)) < 0) {
1864        goto out;
1865    }
1866    /* Stage 2: whew! finally service this operation */
1867    ret = service_operation(new_op, rw->fnstr,
1868            get_interruptible_flag(rw->inode));
1869    if (ret < 0)
1870    {
1871          /* this macro is defined in pvfs2-kernel.h */
1872          handle_io_error();
1873
1874          /*
1875            don't write an error to syslog on signaled operation
1876            termination unless we've got debugging turned on, as
1877            this can happen regularly (i.e. ctrl-c)
1878          */
1879          if (ret == -EINTR)
1880          {
1881              gossip_debug(GOSSIP_FILE_DEBUG, "%s: returning error %ld\n",
1882                      rw->fnstr, (long) ret);
1883          }
1884          else
1885          {
1886              gossip_err(
1887                "%s: error in %s handle %llu, "
1888                "FILE: %s\n  -- returning %ld\n",
1889                rw->fnstr,
1890                rw->type == IO_READX ? "noncontig read from" : "noncontig write to",
1891                llu(get_handle_from_ino(rw->inode)),
1892                (rw->file && rw->file->f_dentry && rw->file->f_dentry->d_name.name ?
1893                     (char *) rw->file->f_dentry->d_name.name : "UNKNOWN"),
1894                    (long) ret);
1895          }
1896          goto out;
1897    }
1898    gossip_debug(GOSSIP_FILE_DEBUG, "downcall returned %lld\n",
1899            llu(new_op->downcall.resp.iox.amt_complete));
1900    /* Stage 3: Post copy buffers */
1901    if ((ret = postcopy_buffers(buffer_index, rw, vec, nr_segs,
1902                    new_op->downcall.resp.iox.amt_complete)) < 0) {
1903        /* put error codes in downcall so that handle_io_error()
1904         * preserves it properly */
1905        new_op->downcall.status = ret;
1906        handle_io_error();
1907        goto out;
1908    }
1909    ret = new_op->downcall.resp.iox.amt_complete;
1910    gossip_debug(GOSSIP_FILE_DEBUG, "wait_for_iox returning %ld\n", (long) ret);
1911     /*
1912      tell the device file owner waiting on I/O that this I/O has
1913      completed and it can return now.  in this exact case, on
1914      wakeup the device will free the op, so we *cannot* touch it
1915      after this.
1916    */
1917    wake_up_daemon_for_return(new_op);
1918    new_op = NULL;
1919out:
1920    if (buffer_index >= 0)
1921    {
1922        pvfs_bufmap_put(buffer_index);
1923        gossip_debug(GOSSIP_FILE_DEBUG, "PUT buffer_index %d\n", buffer_index);
1924        buffer_index = -1;
1925    }
1926    if (new_op)
1927    {
1928        if (new_op->upcall.trailer_buf)
1929            vfree(new_op->upcall.trailer_buf);
1930        op_release(new_op);
1931        new_op = NULL;
1932    }
1933    return ret;
1934}
1935
1936static ssize_t do_readx_writex(struct rw_options *rw)
1937{
1938    ssize_t ret, total_count;
1939    size_t count_mem, count_stream;
1940    struct inode *inode = NULL;
1941    pvfs2_inode_t *pvfs2_inode = NULL;
1942    unsigned int to_free;
1943    const struct iovec *iov;
1944    unsigned long seg, nr_segs, xtnr_segs;
1945    struct xtvec *xtvec;
1946    unsigned long max_new_nr_segs_mem, max_new_nr_segs_stream;
1947    unsigned long new_nr_segs_mem = 0, new_nr_segs_stream = 0;
1948    unsigned long seg_count_mem, *seg_array_mem = NULL;
1949    unsigned long seg_count_stream, *seg_array_stream = NULL;
1950    struct iovec *iovecptr = NULL, *ptr = NULL;
1951    struct xtvec *xtvecptr = NULL, *xptr = NULL;
1952
1953    total_count = 0;
1954    ret = -EINVAL;
1955    to_free = 0;
1956    inode = NULL;
1957    count_mem = 0;
1958    max_new_nr_segs_mem = 0;
1959    count_stream = 0;
1960    max_new_nr_segs_stream = 0;
1961
1962    if (!rw || !rw->fnstr)
1963    {
1964        gossip_lerr("Invalid parameters\n");
1965        goto out;
1966    }
1967    inode = rw->inode;
1968    if (!inode)
1969    {
1970        gossip_err("%s: invalid inode\n", rw->fnstr);
1971        goto out;
1972    }
1973    pvfs2_inode = rw->pvfs2_inode;
1974    if (!pvfs2_inode)
1975    {
1976        gossip_err("%s: Invalid pvfs2 inode\n", rw->fnstr);
1977        goto out;
1978    }
1979    iov  = rw->dest.address.iov;
1980    nr_segs = rw->dest.address.nr_segs;
1981    if (iov == NULL || nr_segs < 0)
1982    {
1983        gossip_err("%s: Invalid iovec %p or nr_segs %lu\n",
1984                rw->fnstr, iov, nr_segs);
1985        goto out;
1986    }
1987    /* Compute total and max number of segments after split of the memory vector */
1988    if ((max_new_nr_segs_mem = bound_max_iovecs(iov, nr_segs, &count_mem)) < 0)
1989    {
1990        gossip_lerr("%s: could not bound iovec %lu\n", rw->fnstr, max_new_nr_segs_mem);
1991        goto out;
1992    }
1993    xtvec = rw->off.iox.xtvec;
1994    xtnr_segs = rw->off.iox.xtnr_segs;
1995    if (xtvec == NULL || xtnr_segs < 0)
1996    {
1997        gossip_err("%s: Invalid xtvec %p or xtnr_segs %lu\n",
1998                rw->fnstr, xtvec, xtnr_segs);
1999        goto out;
2000    }
2001    /* Calculate the total stream length amd max segments after split of the stream vector */
2002    if ((max_new_nr_segs_stream = bound_max_xtvecs(xtvec, xtnr_segs, &count_stream)) < 0)
2003    {
2004        gossip_lerr("%s: could not bound xtvec %lu\n", rw->fnstr, max_new_nr_segs_stream);
2005        goto out;
2006    }
2007    if (count_mem == 0)
2008    {
2009        return 0;
2010    }
2011    if (count_mem != count_stream)
2012    {
2013        gossip_err("%s: mem count %ld != stream count %ld\n",
2014                rw->fnstr, (long) count_mem, (long) count_stream);
2015        goto out;
2016    }
2017    /*
2018     * if the total size of data transfer requested is greater than
2019     * the kernel-set blocksize of PVFS2, then we split the iovecs
2020     * such that no iovec description straddles a block size limit
2021     */
2022    if (count_mem > pvfs_bufmap_size_query())
2023    {
2024        /*
2025         * Split up the given iovec description such that
2026         * no iovec descriptor straddles over the block-size limitation.
2027         * This makes us our job easier to stage the I/O.
2028         * In addition, this function will also compute an array with seg_count
2029         * entries that will store the number of segments that straddle the
2030         * block-size boundaries.
2031         */
2032        ret = split_iovecs(max_new_nr_segs_mem, /* IN */
2033                           nr_segs,             /* IN */
2034                           iov,                 /* IN */
2035                           &new_nr_segs_mem,    /* OUT */
2036                           &iovecptr,           /* OUT */
2037                           &seg_count_mem,      /* OUT */
2038                           &seg_array_mem);     /* OUT */
2039        if(ret < 0)
2040        {
2041            gossip_err("%s: Failed to split iovecs to satisfy larger "
2042                    " than blocksize readx request %ld\n", rw->fnstr, (long) ret);
2043            goto out;
2044        }
2045        /* We must free seg_array_mem and iovecptr, xtvecptr and seg_array_stream */
2046        to_free = 1;
2047        gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting iovecs from %lu to %lu [max_new %lu]\n",
2048                rw->fnstr, nr_segs, new_nr_segs_mem, max_new_nr_segs_mem);
2049        /*
2050         * Split up the given xtvec description such that
2051         * no xtvec descriptor straddles over the block-size limitation.
2052         */
2053        ret = split_xtvecs(max_new_nr_segs_stream, /* IN */
2054                           xtnr_segs,              /* IN */
2055                           xtvec,                  /* IN */
2056                           &new_nr_segs_stream,    /* OUT */
2057                           &xtvecptr,              /* OUT */
2058                           &seg_count_stream,      /* OUT */
2059                           &seg_array_stream);     /* OUT */
2060        if(ret < 0)
2061        {
2062            gossip_err("Failed to split iovecs to satisfy larger "
2063                    " than blocksize readx request %ld\n", (long) ret);
2064            goto out;
2065        }
2066        gossip_debug(GOSSIP_FILE_DEBUG, "%s: Splitting xtvecs from %lu to %lu [max_new %lu]\n",
2067                rw->fnstr, xtnr_segs, new_nr_segs_stream, max_new_nr_segs_stream);
2068    }
2069    else
2070    {
2071        new_nr_segs_mem = nr_segs;
2072        /* use the given iovec description */
2073        iovecptr = (struct iovec *) iov;
2074        /* There is only 1 element in the seg_array_mem */
2075        seg_count_mem = 1;
2076        /* and its value is the number of segments passed in */
2077        seg_array_mem = &nr_segs;
2078       
2079        new_nr_segs_stream = xtnr_segs;
2080        /* use the given file description */
2081        xtvecptr = (struct xtvec *) xtvec;
2082        /* There is only 1 element in the seg_array_stream */
2083        seg_count_stream = 1;
2084        /* and its value is the number of segments passed in */
2085        seg_array_stream = &xtnr_segs;
2086        /* We dont have to free up anything */
2087        to_free = 0;
2088    }
2089#ifdef PVFS2_KERNEL_DEBUG
2090    for (seg = 0; seg < new_nr_segs_mem; seg++)
2091    {
2092        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %p to %p [%ld bytes]\n",
2093                rw->fnstr,
2094                seg + 1, iovecptr[seg].iov_base,
2095                iovecptr[seg].iov_base + iovecptr[seg].iov_len,
2096                (long) iovecptr[seg].iov_len);
2097    }
2098    for (seg = 0; seg < new_nr_segs_stream; seg++)
2099    {
2100        gossip_debug(GOSSIP_FILE_DEBUG, "%s: %d) %ld to %ld [%ld bytes]\n",
2101                rw->fnstr,
2102                seg + 1, (long) xtvecptr[seg].xtv_off,
2103                (long) xtvecptr[seg].xtv_off + xtvecptr[seg].xtv_len,
2104                (long) xtvecptr[seg].xtv_len);
2105    }
2106#endif
2107    seg = 0;
2108    ptr = iovecptr;
2109    xptr = xtvecptr;
2110
2111    while (total_count < count_mem)
2112    {
2113        size_t  each_count, amt_complete;
2114
2115        /* how much to transfer in this loop iteration */
2116        each_count = (((count_mem - total_count) > pvfs_bufmap_size_query()) ?
2117                      pvfs_bufmap_size_query() : (count_mem - total_count));
2118        /* and push the I/O directly through to the servers */
2119        ret = wait_for_iox(rw, ptr, seg_array_mem[seg],
2120                xptr, seg_array_stream[seg], each_count);
2121        if (ret < 0)
2122        {
2123            goto out;
2124        }
2125        /* Advance the iovec pointer */
2126        ptr += seg_array_mem[seg];
2127        /* Advance the xtvec pointer */
2128        xptr += seg_array_stream[seg];
2129        seg++;
2130        total_count += ret;
2131        amt_complete = ret;
2132        /* if we got a short I/O operations,
2133         * fall out and return what we got so far
2134         */
2135        if (amt_complete < each_count)
2136        {
2137            break;
2138        }
2139    }
2140    if (total_count > 0)
2141    {
2142        ret = total_count;
2143    }
2144out:
2145    if (to_free)
2146    {
2147        kfree(iovecptr);
2148        kfree(seg_array_mem);
2149        kfree(xtvecptr);
2150        kfree(seg_array_stream);
2151    }
2152    if (ret > 0 && inode != NULL && pvfs2_inode != NULL)
2153    {
2154        if (rw->type == IO_READX)
2155        {
2156            SetAtimeFlag(pvfs2_inode);
2157            inode->i_atime = CURRENT_TIME;
2158        }
2159        else
2160        {
2161            SetMtimeFlag(pvfs2_inode);
2162            inode->i_mtime = CURRENT_TIME;
2163        }
2164        mark_inode_dirty_sync(inode);
2165    }
2166    return ret;
2167}
2168
2169#ifndef HAVE_READX_FILE_OPERATIONS
2170static ssize_t pvfs2_file_readx(
2171    struct file *file,
2172    const struct iovec *iov,
2173    unsigned long nr_segs,
2174    const struct xtvec *xtvec,
2175    unsigned long xtnr_segs) __attribute__((unused));
2176#endif
2177static ssize_t pvfs2_file_readx(
2178    struct file *file,
2179    const struct iovec *iov,
2180    unsigned long nr_segs,
2181    const struct xtvec *xtvec,
2182    unsigned long xtnr_segs)
2183{
2184    struct rw_options rw;
2185
2186    memset(&rw, 0, sizeof(rw));
2187    rw.async = 0;
2188    rw.type = IO_READX;
2189    rw.copy_dest_type = COPY_DEST_ADDRESSES;
2190    rw.copy_to_user_addresses = 1;
2191    rw.fnstr = __FUNCTION__;
2192    rw.inode = file->f_dentry->d_inode;
2193    rw.pvfs2_inode = PVFS2_I(rw.inode);
2194    rw.file  = file;
2195    rw.dest.address.iov = (struct iovec *) iov;
2196    rw.dest.address.nr_segs = nr_segs;
2197    rw.off.iox.xtvec = (struct xtvec *) xtvec;
2198    rw.off.iox.xtnr_segs = xtnr_segs;
2199    g_pvfs2_stats.reads++;
2200    return do_readx_writex(&rw);
2201}
2202
2203#ifndef HAVE_WRITEX_FILE_OPERATIONS
2204static ssize_t pvfs2_file_writex(
2205    struct file *file,
2206    const struct iovec *iov,
2207    unsigned long nr_segs,
2208    const struct xtvec *xtvec,
2209    unsigned long xtnr_segs) __attribute__((unused));
2210#endif
2211static ssize_t pvfs2_file_writex(
2212    struct file *file,
2213    const struct iovec *iov,
2214    unsigned long nr_segs,
2215    const struct xtvec *xtvec,
2216    unsigned long xtnr_segs)
2217{
2218    struct rw_options rw;
2219
2220    memset(&rw, 0, sizeof(rw));
2221    rw.async = 0;
2222    rw.type = IO_WRITEX;
2223    rw.copy_dest_type = COPY_DEST_ADDRESSES;
2224    rw.copy_to_user_addresses = 1;
2225    rw.fnstr = __FUNCTION__;
2226    rw.inode = file->f_dentry->d_inode;
2227    rw.pvfs2_inode = PVFS2_I(rw.inode);
2228    rw.file  = file;
2229    rw.dest.address.iov = (struct iovec *) iov;
2230    rw.dest.address.nr_segs = nr_segs;
2231    rw.off.iox.xtvec = (struct xtvec *) xtvec;
2232    rw.off.iox.xtnr_segs = xtnr_segs;
2233    g_pvfs2_stats.writes++;
2234    return do_readx_writex(&rw);
2235}
2236
2237#ifdef HAVE_AIO_VFS_SUPPORT
2238/*
2239 * NOTES on the aio implementation.
2240 * Conceivably, we could just make use of the
2241 * generic_aio_file_read/generic_aio_file_write
2242 * functions that stages the read/write through
2243 * the page-cache. But given that we are not
2244 * interested in staging anything thru the page-cache,
2245 * we are going to resort to another
2246 * design.
2247 *
2248 * The aio callbacks to be implemented at the f.s. level
2249 * are fairly straightforward. All we see at this level
2250 * are individual
2251 * contiguous file block reads/writes. This means that
2252 * we can just make use
2253 * of the current set of I/O upcalls without too much
2254 * modifications. (All we need is an extra flag for sync/async)
2255 *
2256 * However, we do need to handle cancellations properly.
2257 * What this means
2258 * is that the "ki_cancel" callback function must be set so
2259 * that the kernel calls
2260 * us back with the kiocb structure for proper cancellation.
2261 * This way we can send appropriate upcalls
2262 * to cancel I/O operations if need be and copy status/results
2263 * back to user-space.
2264 */
2265
2266/*
2267 * This is the retry routine called by the AIO core to
2268 * try and see if the
2269 * I/O operation submitted earlier can be completed
2270 * atleast now :)
2271 * We can use copy_*() functions here because the kaio
2272 * threads do a use_mm() and assume the memory context of
2273 * the user-program that initiated the aio(). whew,
2274 * that's a big relief.
2275 */
2276static ssize_t pvfs2_aio_retry(struct kiocb *iocb)
2277{
2278    pvfs2_kiocb *x = NULL;
2279    pvfs2_kernel_op_t *op = NULL;
2280    ssize_t error = 0;
2281
2282    if ((x = (pvfs2_kiocb *) iocb->private) == NULL)
2283    {
2284        gossip_err("pvfs2_aio_retry: could not "
2285                " retrieve pvfs2_kiocb!\n");
2286        return -EINVAL;
2287    }
2288    /* highly unlikely, but somehow paranoid need for checking */
2289    if (((op = x->op) == NULL)
2290            || x->kiocb != iocb
2291            || x->buffer_index < 0)
2292    {
2293        /*
2294         * Well, if this happens, we are toast!
2295         * What should we cleanup if such a thing happens?
2296         */
2297        gossip_err("pvfs2_aio_retry: critical error "
2298                " x->op = %p, iocb = %p, buffer_index = %d\n",
2299                x->op, x->kiocb, x->buffer_index);
2300        return -EINVAL;
2301    }
2302    /* lock up the op */
2303    spin_lock(&op->lock);
2304    /* check the state of the op */
2305    if (op_state_waiting(op) || op_state_in_progress(op))
2306    {
2307        spin_unlock(&op->lock);
2308        return -EIOCBQUEUED;
2309    }
2310    else
2311    {
2312        /*
2313         * the daemon has finished servicing this
2314         * operation. It has also staged
2315         * the I/O to the data servers on a write
2316         * (if possible) and put the return value
2317         * of the operation in bytes_copied.
2318         * Similarly, on a read the value stored in
2319         * bytes_copied is the error code or the amount
2320         * of data that was copied to user buffers.
2321         */
2322        error = x->bytes_copied;
2323        op->priv = NULL;
2324        spin_unlock(&op->lock);
2325        gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_aio_retry: iov %p,"
2326                " size %d return %d bytes\n",
2327                    x->iov, (int) x->bytes_to_be_copied, (int) error);
2328        if (error > 0)
2329        {
2330            struct inode *inode = iocb->ki_filp->f_mapping->host;
2331            pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
2332            if (x->rw == PVFS_IO_READ)
2333            {
2334                SetAtimeFlag(pvfs2_inode);
2335                inode->i_atime = CURRENT_TIME;
2336            }
2337            else
2338            {
2339                SetMtimeFlag(pvfs2_inode);
2340                inode->i_mtime = CURRENT_TIME;
2341            }
2342            mark_inode_dirty_sync(inode);
2343        }
2344        /*
2345         * Now we can happily free up the op,
2346         * and put buffer_index also away
2347         */
2348        if (x->buffer_index >= 0)
2349        {
2350            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_aio_retry: put bufmap_index "
2351                    " %d\n", x->buffer_index);
2352            pvfs_bufmap_put(x->buffer_index);
2353            x->buffer_index = -1;
2354        }
2355        /* drop refcount of op and deallocate if possible */
2356        put_op(op);
2357        x->needs_cleanup = 0;
2358        /* x is itself deallocated when the destructor is called */
2359        return error;
2360    }
2361}
2362
2363/*
2364 * Using the iocb->private->op->tag field,
2365 * we should try and cancel the I/O
2366 * operation, and also update res->obj
2367 * and res->data to the values
2368 * at the time of cancellation.
2369 * This is called not only by the io_cancel()
2370 * system call, but also by the exit_mm()/aio_cancel_all()
2371 * functions when the process that issued
2372 * the aio operation is about to exit.
2373 */
2374static int
2375pvfs2_aio_cancel(struct kiocb *iocb, struct io_event *event)
2376{
2377    pvfs2_kiocb *x = NULL;
2378    if (iocb == NULL || event == NULL)
2379    {
2380        gossip_err("pvfs2_aio_cancel: Invalid parameters "
2381                " %p, %p!\n", iocb, event);
2382        return -EINVAL;
2383    }
2384    x = (pvfs2_kiocb *) iocb->private;
2385    if (x == NULL)
2386    {
2387        gossip_err("pvfs2_aio_cancel: cannot retrieve "
2388                " pvfs2_kiocb structure!\n");
2389        return -EINVAL;
2390    }
2391    else
2392    {
2393        pvfs2_kernel_op_t *op = NULL;
2394        int ret;
2395        /*
2396         * Do some sanity checks
2397         */
2398        if (x->kiocb != iocb)
2399        {
2400            gossip_err("pvfs2_aio_cancel: kiocb structures "
2401                    "don't match %p %p!\n", x->kiocb, iocb);
2402            return -EINVAL;
2403        }
2404        if ((op = x->op) == NULL)
2405        {
2406            gossip_err("pvfs2_aio_cancel: cannot retreive "
2407                    "pvfs2_kernel_op structure!\n");
2408            return -EINVAL;
2409        }
2410        kiocbSetCancelled(iocb);
2411        get_op(op);
2412        /*
2413         * This will essentially remove it from
2414         * htable_in_progress or from the req list
2415         * as the case may be.
2416         */
2417        clean_up_interrupted_operation(op);
2418        /*
2419         * However, we need to make sure that
2420         * the client daemon is not transferring data
2421         * as we speak! Thus we look at the reference
2422         * counter to determine if that is indeed the case.
2423         */
2424        do
2425        {
2426            int timed_out_or_signal = 0;
2427
2428            DECLARE_WAITQUEUE(wait_entry, current);
2429            /* add yourself to the wait queue */
2430            add_wait_queue_exclusive(
2431                    &op->io_completion_waitq, &wait_entry);
2432
2433            spin_lock(&op->lock);
2434            while (op->io_completed == 0)
2435            {
2436                set_current_state(TASK_INTERRUPTIBLE);
2437                /* We don't need to wait if client-daemon did not get a reference to op */
2438                if (!op_wait(op))
2439                    break;
2440                /*
2441                 * There may be a window if the client-daemon has acquired a reference
2442                 * to op, but not a spin-lock on it yet before which the async
2443                 * canceller (i.e. this piece of code) acquires the same.
2444                 * Consequently we may end up with a
2445                 * race. To prevent that we use the aio_ref_cnt counter.
2446                 */
2447                spin_unlock(&op->lock);
2448                if (!signal_pending(current))
2449                {
2450                    int timeout = MSECS_TO_JIFFIES(1000 * op_timeout_secs);
2451                    if (!schedule_timeout(timeout))
2452                    {
2453                        gossip_debug(GOSSIP_FILE_DEBUG, "Timed out on I/O cancellation - aborting\n");
2454                        timed_out_or_signal = 1;
2455                        spin_lock(&op->lock);
2456                        break;
2457                    }
2458                    spin_lock(&op->lock);
2459                    continue;
2460                }
2461                gossip_debug(GOSSIP_FILE_DEBUG, "signal on Async I/O cancellation - aborting\n");
2462                timed_out_or_signal = 1;
2463                spin_lock(&op->lock);
2464                break;
2465            }
2466            set_current_state(TASK_RUNNING);
2467            remove_wait_queue(&op->io_completion_waitq, &wait_entry);
2468
2469        } while (0);
2470
2471        /* We need to fill up event->res and event->res2 if at all */
2472        if (op_state_serviced(op))
2473        {
2474            op->priv = NULL;
2475            spin_unlock(&op->lock);
2476            event->res = x->bytes_copied;
2477            event->res2 = 0;
2478        }
2479        else if (op_state_in_progress(op))
2480        {
2481            op->priv = NULL;
2482            spin_unlock(&op->lock);
2483            gossip_debug(GOSSIP_FILE_DEBUG, "Trying to cancel operation in "
2484                    " progress %ld\n", (unsigned long) op->tag);
2485            /*
2486             * if operation is in progress we need to send
2487             * a cancellation upcall for this tag
2488             * The return value of that is the cancellation
2489             * event return value.
2490             */
2491            event->res = pvfs2_cancel_op_in_progress(op->tag);
2492            event->res2 = 0;
2493        }
2494        else
2495        {
2496            op->priv = NULL;
2497            spin_unlock(&op->lock);
2498            event->res = -EINTR;
2499            event->res2 = 0;
2500        }
2501        /*
2502         * Drop the buffer pool index
2503         */
2504        if (x->buffer_index >= 0)
2505        {
2506            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_aio_cancel: put bufmap_index "
2507                    " %d\n", x->buffer_index);
2508            pvfs_bufmap_put(x->buffer_index);
2509            x->buffer_index = -1;
2510        }
2511        /*
2512         * Put reference to op twice,
2513         * once for the reader/writer that initiated
2514         * the op and
2515         * once for the cancel
2516         */
2517        put_op(op);
2518        put_op(op);
2519        x->needs_cleanup = 0;
2520        /*
2521         * This seems to be a weird undocumented
2522         * thing, where the cancel routine is expected
2523         * to manually decrement ki_users field!
2524         * before calling aio_put_req().
2525         */
2526        iocb->ki_users--;
2527        ret = aio_put_req(iocb);
2528        /* x is itself deallocated by the destructor */
2529        return 0;
2530    }
2531}
2532
2533/*
2534 * Destructor is called when the kiocb structure is
2535 * about to be deallocated by the AIO core.
2536 *
2537 * Conceivably, this could be moved onto pvfs2-cache.c
2538 * as the kiocb_dtor() function that can be associated
2539 * with the pvfs2_kiocb object.
2540 */
2541static void pvfs2_aio_dtor(struct kiocb *iocb)
2542{
2543    pvfs2_kiocb *x = iocb->private;
2544    if (x && x->needs_cleanup == 1)
2545    {
2546        /* do a cleanup of the buffers and possibly op */
2547        if (x->buffer_index >= 0)
2548        {
2549            gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_aio_dtor: put bufmap_index "
2550                    " %d\n", x->buffer_index);
2551            pvfs_bufmap_put(x->buffer_index);
2552            x->buffer_index = -1;
2553        }
2554        if (x->op)
2555        {
2556            x->op->priv = NULL;
2557            put_op(x->op);
2558        }
2559        if (x->iov)
2560        {
2561            kfree(x->iov);
2562            x->iov = NULL;
2563        }
2564        x->needs_cleanup = 0;
2565    }
2566    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_aio_dtor: kiocb_release %p\n", x);
2567    kiocb_release(x);
2568    iocb->private = NULL;
2569    return;
2570}
2571
2572static inline int
2573fill_default_kiocb(pvfs2_kiocb *x,
2574        struct task_struct *tsk,
2575        struct kiocb *iocb, int rw,
2576        int buffer_index, pvfs2_kernel_op_t *op,
2577        const struct iovec *iovec, unsigned long nr_segs,
2578        loff_t offset, size_t count,
2579        int (*aio_cancel)(struct kiocb *, struct io_event *))
2580{
2581    x->tsk = tsk;
2582    x->kiocb = iocb;
2583    x->buffer_index = buffer_index;
2584    x->op = op;
2585    x->rw = rw;
2586    x->bytes_to_be_copied = count;
2587    x->offset = offset;
2588    x->bytes_copied = 0;
2589    x->needs_cleanup = 1;
2590    iocb->ki_cancel = aio_cancel;
2591    /* Allocate a private pointer to store the
2592     * iovector since the caller could pass in a
2593     * local variable for the iovector.
2594     */
2595    x->iov = kmalloc(nr_segs * sizeof(*x->iov), PVFS2_BUFMAP_GFP_FLAGS);
2596    if (x->iov == NULL)
2597    {
2598        return -ENOMEM;
2599    }
2600    memcpy(x->iov, iovec, nr_segs * sizeof(*x->iov));
2601    x->nr_segs = nr_segs;
2602    return 0;
2603}
2604
2605/*
2606 * This function will do the following,
2607 * On an error, it returns a -ve error number.
2608 * For a synchronous iocb, we copy the data into the
2609 * user buffer's before returning and
2610 * the count of how much was actually read.
2611 * For a first-time asynchronous iocb, we submit the
2612 * I/O to the client-daemon and do not wait
2613 * for the matching downcall to be written and we
2614 * return a special -EIOCBQUEUED
2615 * to indicate that we have queued the request.
2616 * NOTE: Unlike typical aio requests
2617 * that get completion notification from interrupt
2618 * context, we get completion notification from a process
2619 * context (i.e. the client daemon).
2620 * TODO: We handle vectored aio requests now but we do
2621 * not handle the case where the total size of IO is
2622 * larger than our FS transfer block size (4 MB
2623 * default).
2624 */
2625static ssize_t do_aio_read_write(struct rw_options *rw)
2626{
2627    struct file *filp;
2628    struct inode *inode;
2629    ssize_t error;
2630    pvfs2_inode_t *pvfs2_inode;
2631    const struct iovec *iov;
2632    unsigned long nr_segs, max_new_nr_segs;
2633    size_t count;
2634    struct kiocb *iocb;
2635    loff_t *offset;
2636    pvfs2_kiocb *x;
2637
2638    error = -EINVAL;
2639    if (!rw || !rw->fnstr || !rw->off.io.offset)
2640    {
2641        gossip_lerr("Invalid parameters (rw %p)\n", rw);
2642        goto out_error;
2643    }
2644    inode = rw->inode;
2645    filp  = rw->file;
2646    iocb  = rw->iocb;
2647    pvfs2_inode = rw->pvfs2_inode;
2648    offset = rw->off.io.offset;
2649    if (!inode || !filp || !pvfs2_inode || !iocb || !offset)
2650    {
2651        gossip_lerr("Invalid parameters\n");
2652        goto out_error;
2653    }
2654    if (iocb->ki_pos != *offset)
2655    {
2656        gossip_lerr("iocb offsets don't match (%llu %llu)\n",
2657                llu(iocb->ki_pos), llu(*offset));
2658        goto out_error;
2659    }
2660    iov = rw->dest.address.iov;
2661    nr_segs = rw->dest.address.nr_segs;
2662    if (iov == NULL || nr_segs < 0)
2663    {
2664        gossip_lerr("Invalid iovector (%p) or invalid iovec count (%ld)\n",
2665                iov, nr_segs);
2666        goto out_error;
2667    }
2668    count = 0;
2669    /* Compute total and max number of segments after split */
2670    if ((max_new_nr_segs = bound_max_iovecs(iov, nr_segs, &count)) < 0)
2671    {
2672        gossip_lerr("%s: could not bound iovecs %ld\n", rw->fnstr, max_new_nr_segs);
2673        goto out_error;
2674    }
2675    if (unlikely(((ssize_t)count)) < 0)
2676    {
2677        gossip_lerr("%s: count overflow\n", rw->fnstr);
2678        goto out_error;
2679    }
2680    /* synchronous I/O */
2681    if (!rw->async)
2682    {
2683        error = do_readv_writev(rw);
2684        /* not sure this is the correct place or way to update ki_pos but it
2685         * definitely needs to occur somehow. otherwise, a write following
2686         * a synchronous writev will not write at the correct file position.
2687         * store the offset from the read/write into the kiocb struct */
2688        iocb->ki_pos = *offset;
2689        goto out_error;
2690    }
2691    /* Asynchronous I/O */
2692    if (rw->type == IO_WRITE)
2693    {
2694        int ret;
2695        /* perform generic tests for sanity of write arguments */
2696#ifdef PVFS2_LINUX_KERNEL_2_4
2697        ret = pvfs2_precheck_file_write(filp, inode, &count, offset);
2698#else
2699        ret = generic_write_checks(filp, offset, &count, S_ISBLK(inode->i_mode));
2700#endif
2701        if (ret != 0)
2702        {
2703            gossip_err("%s: failed generic "
2704                    " argument checks.\n", rw->fnstr);
2705            return ret;
2706        }
2707    }
2708    if (count == 0)
2709    {
2710        error = 0;
2711        goto out_error;
2712    }
2713    else if (count > pvfs_bufmap_size_query())
2714    {
2715        /* TODO: Asynchronous I/O operation is not allowed to
2716         * be greater than our block size
2717         */
2718        gossip_lerr("%s: cannot transfer (%zd) bytes"
2719                " (larger than block size %d)\n",
2720                rw->fnstr, count, pvfs_bufmap_size_query());
2721        goto out_error;
2722    }
2723    gossip_debug(GOSSIP_FILE_DEBUG, "Posting asynchronous I/O operation\n");
2724    /* First time submission */
2725    if ((x = (pvfs2_kiocb *) iocb->private) == NULL)
2726    {
2727        int buffer_index = -1;
2728        pvfs2_kernel_op_t *new_op = NULL;
2729        pvfs2_inode_t *pvfs2_inode = PVFS2_I(inode);
2730       
2731        new_op = op_alloc(PVFS2_VFS_OP_FILE_IO);
2732        if (!new_op)
2733        {
2734            error = -ENOMEM;
2735            goto out_error;
2736        }
2737        /* Increase ref count */
2738        get_op(new_op);
2739        /* Asynchronous I/O */
2740        new_op->upcall.req.io.async_vfs_io = PVFS_VFS_ASYNC_IO;
2741        new_op->upcall.req.io.io_type = (rw->type == IO_READ) ?
2742                                        PVFS_IO_READ : PVFS_IO_WRITE;
2743        new_op->upcall.req.io.refn = pvfs2_inode->refn;
2744        error = pvfs_bufmap_get(&buffer_index);
2745        if (error < 0)
2746        {
2747            gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get()"
2748                    " failure %ld\n", rw->fnstr, (long) error);
2749            /* drop ref count and possibly de-allocate */
2750            put_op(new_op);
2751            goto out_error;
2752        }
2753        gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_get %d\n",
2754                rw->fnstr, buffer_index);
2755        new_op->upcall.req.io.buf_index = buffer_index;
2756        new_op->upcall.req.io.count = count;
2757        new_op->upcall.req.io.offset = *offset;
2758        if (rw->type == IO_WRITE)
2759        {
2760            /*
2761             * copy the data from the application for writes.
2762             * We could return -EIOCBRETRY here and have
2763             * the data copied in the pvfs2_aio_retry routine,
2764             * I dont see too much point in doing that
2765             * since the app would have touched the
2766             * memory pages prior to the write and
2767             * hence accesses to the page won't block.
2768             */
2769            if (rw->copy_to_user_addresses)
2770            {
2771                error = pvfs_bufmap_copy_iovec_from_user(
2772                        buffer_index,
2773                        iov,
2774                        nr_segs,
2775                        count);
2776            }
2777            else
2778            {
2779                error = pvfs_bufmap_copy_iovec_from_kernel(
2780                        buffer_index,
2781                        iov,
2782                        nr_segs,
2783                        count);
2784            }
2785            if (error < 0)
2786            {
2787                gossip_err("%s: Failed to copy user buffer %ld. Make sure that pvfs2-client-core"
2788                        " is still running \n", rw->fnstr, (long) error);
2789                /* drop the buffer index */
2790                pvfs_bufmap_put(buffer_index);
2791                gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_put %d\n",
2792                        rw->fnstr, buffer_index);
2793                /* drop the reference count and deallocate */
2794                put_op(new_op);
2795                goto out_error;
2796            }
2797        }
2798        x = kiocb_alloc();
2799        if (x == NULL)
2800        {
2801            error = -ENOMEM;
2802            /* drop the buffer index */
2803            pvfs_bufmap_put(buffer_index);
2804            gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_put %d\n",
2805                    rw->fnstr, buffer_index);
2806            /* drop the reference count and deallocate */
2807            put_op(new_op);
2808            goto out_error;
2809        }
2810        gossip_debug(GOSSIP_FILE_DEBUG, "kiocb_alloc: %p\n", x);
2811        /*
2812         * We need to set the cancellation callbacks +
2813         * other state information
2814         * here if the asynchronous request is going to
2815         * be successfully submitted
2816         */
2817        error = fill_default_kiocb(x, current, iocb,
2818                                   (rw->type == IO_READ) ? PVFS_IO_READ : PVFS_IO_WRITE,
2819                                   buffer_index,
2820                                   new_op, iov, nr_segs,
2821                                   *offset, count,
2822                                   &pvfs2_aio_cancel);
2823        if (error != 0)
2824        {
2825            kiocb_release(x);
2826            /* drop the buffer index */
2827            pvfs_bufmap_put(buffer_index);
2828            gossip_debug(GOSSIP_FILE_DEBUG, "%s: pvfs_bufmap_put %d\n",
2829                    rw->fnstr, buffer_index);
2830            /* drop the reference count and deallocate */
2831            put_op(new_op);
2832            goto out_error;
2833        }
2834        /*
2835         * destructor function to make sure that we free
2836         * up this allocated piece of memory
2837         */
2838        iocb->ki_dtor = pvfs2_aio_dtor;
2839        /*
2840         * We need to be able to retrieve this structure from
2841         * the op structure as well, since the client-daemon
2842         * needs to send notifications upon aio_completion.
2843         */
2844        new_op->priv = x;
2845        /* and stash it away in the kiocb structure as well */
2846        iocb->private = x;
2847        /*
2848         * Add it to the list of ops to be serviced
2849         * but don't wait for it to be serviced.
2850         * Return immediately
2851         */
2852        service_operation(new_op, rw->fnstr,
2853                PVFS2_OP_ASYNC);
2854        gossip_debug(GOSSIP_FILE_DEBUG, "%s: queued "
2855                " operation [%llu for %zd]\n",
2856                rw->fnstr, llu(*offset), count);
2857        error = -EIOCBQUEUED;
2858        /*
2859         * All cleanups done upon completion
2860         * (OR) cancellation!
2861         */
2862    }
2863    /* I don't think this path will ever be taken */
2864    else { /* retry and see what is the status! */
2865        error = pvfs2_aio_retry(iocb);
2866    }
2867out_error:
2868    return error;
2869}
2870
2871static ssize_t pvfs2_file_aio_read_iovec(struct kiocb *iocb,
2872                                         const struct iovec *iov,
2873                                         unsigned long nr_segs, loff_t offset)
2874{
2875    struct rw_options rw;
2876
2877    gossip_err("Executing pvfs2_file_aio_read_iovec.  offset:%lld \ttotal length:%zd\n"
2878              ,(long long)offset
2879              ,iov_length(iov,nr_segs));
2880
2881    memset(&rw, 0, sizeof(rw));
2882    rw.async = !is_sync_kiocb(iocb);
2883    rw.type = IO_READ;
2884    rw.copy_dest_type = COPY_DEST_ADDRESSES;
2885    rw.off.io.offset = &offset;
2886    rw.copy_to_user_addresses = 1;
2887    rw.fnstr = __FUNCTION__;
2888    rw.iocb = iocb;
2889    rw.file = iocb->ki_filp;
2890    if (!rw.file || !(rw.file)->f_mapping)
2891    {
2892        return -EINVAL;
2893    }
2894    rw.inode = (rw.file)->f_mapping->host;
2895    rw.pvfs2_inode = PVFS2_I(rw.inode);
2896    rw.dest.address.iov = iov;
2897    rw.dest.address.nr_segs = nr_segs;
2898    rw.readahead_size = 0;
2899    g_pvfs2_stats.reads++;
2900    return do_aio_read_write(&rw);
2901}
2902
2903static ssize_t pvfs2_file_aio_write_iovec(struct kiocb *iocb,
2904                                          const struct iovec *iov,
2905                                          unsigned long nr_segs, loff_t offset)
2906{
2907    struct rw_options rw;
2908
2909    memset(&rw, 0, sizeof(rw));
2910    rw.async = !is_sync_kiocb(iocb);
2911    rw.type = IO_WRITE;
2912    rw.copy_dest_type = COPY_DEST_ADDRESSES;
2913    rw.readahead_size = 0;
2914    rw.off.io.offset = &offset;
2915    rw.copy_to_user_addresses = 1;
2916    rw.fnstr = __FUNCTION__;
2917    rw.iocb = iocb;
2918    rw.file = iocb->ki_filp;
2919    if (!rw.file || !(rw.file)->f_mapping)
2920    {
2921        return -EINVAL;
2922    }
2923    rw.inode = (rw.file)->f_mapping->host;
2924    rw.pvfs2_inode = PVFS2_I(rw.inode);
2925    rw.dest.address.iov = iov;
2926    rw.dest.address.nr_segs = nr_segs;
2927    g_pvfs2_stats.writes++;
2928    return do_aio_read_write(&rw);
2929}
2930
2931/* compat functions for < 2.6.19 */
2932#ifndef HAVE_COMBINED_AIO_AND_VECTOR
2933static ssize_t
2934pvfs2_file_aio_read(struct kiocb *iocb, char __user *buffer,
2935        size_t count, loff_t offset)
2936
2937{
2938    struct iovec iov = {
2939        .iov_base = buffer,
2940        .iov_len = count,
2941    };
2942    return pvfs2_file_aio_read_iovec(iocb, &iov, 1, offset);
2943}
2944
2945static ssize_t
2946pvfs2_file_aio_write(struct kiocb *iocb, const char __user *buffer,
2947        size_t count, loff_t offset)
2948{
2949    struct iovec iov = {
2950        .iov_base = (void __user *) buffer,  /* discard const so it fits */
2951        .iov_len = count,
2952    };
2953    return pvfs2_file_aio_write_iovec(iocb, &iov, 1, offset);
2954}
2955#endif
2956#endif  /* HAVE_AIO_VFS_SUPPORT */
2957
2958/** Perform a miscellaneous operation on a file.
2959 */
2960
2961#ifdef HAVE_NO_FS_IOC_FLAGS
2962#ifdef HAVE_UNLOCKED_IOCTL_HANDLER
2963long pvfs2_ioctl(
2964#else
2965int pvfs2_ioctl(
2966        struct inode *inode,
2967#endif /* HAVE_UNLOCKED_IOCTL_HANDLER */
2968        struct file *file,
2969        unsigned int cmd,
2970        unsigned long arg)
2971{
2972    return -ENOTTY;
2973}
2974#else
2975
2976#ifdef HAVE_UNLOCKED_IOCTL_HANDLER
2977long pvfs2_ioctl(
2978#else
2979int pvfs2_ioctl(
2980    struct inode *inode,
2981#endif /* HAVE_UNLOCKED_IOCTL_HANDLER */
2982    struct file *file,
2983    unsigned int cmd,
2984    unsigned long arg)
2985{
2986    int ret = -ENOTTY;
2987    uint64_t val = 0;
2988    unsigned long uval;
2989
2990    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_ioctl: called with cmd %d\n", cmd);
2991
2992    /* we understand some general ioctls on files, such as the immutable
2993     * and append flags
2994     */
2995    if(cmd == FS_IOC_GETFLAGS)
2996    {
2997        val = 0;
2998        ret = pvfs2_xattr_get_default(
2999#ifdef HAVE_XATTR_HANDLER_GET_FIVE_PARAM
3000                file->f_dentry,
3001#else
3002                file->f_dentry->d_inode,
3003#endif /* HAVE_XATTR_HANDLER_GET_FIVE_PARAM */
3004                "user.pvfs2.meta_hint",
3005                &val,
3006                sizeof(val)
3007#ifdef HAVE_XATTR_HANDLER_GET_FIVE_PARAM
3008                , 0
3009#endif /* HAVE_XATTR_HANDLER_GET_FIVE_PARAM */
3010                );
3011        if(ret < 0 && ret != -ENODATA)
3012        {
3013            return ret;
3014        }
3015        else if(ret == -ENODATA)
3016        {
3017            val = 0;
3018        }
3019        uval = val;
3020        gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_ioctl: FS_IOC_GETFLAGS: %llu\n",
3021                     (unsigned long long)uval);
3022        return put_user(uval, (int __user *)arg);
3023    }
3024    else if(cmd == FS_IOC_SETFLAGS)
3025    {
3026        ret = 0;
3027        if(get_user(uval, (int __user *)arg))
3028        {
3029            return -EFAULT;
3030        }
3031        /* PVFS_MIRROR_FL is set internally when the mirroring mode is turned
3032         * on for a file.  The user is not allowed to turn on this bit, but the
3033         * bit is present if the user first gets the flags and then updates the
3034         * flags with some new settings. So, we ignore it in the following
3035         * edit. bligon.
3036        */
3037        if((uval & ~PVFS_MIRROR_FL) &
3038           (~(FS_IMMUTABLE_FL|FS_APPEND_FL|FS_NOATIME_FL)))
3039        {
3040            gossip_err("pvfs2_ioctl: the FS_IOC_SETFLAGS only supports setting "
3041                       "one of FS_IMMUTABLE_FL|FS_APPEND_FL|FS_NOATIME_FL\n");
3042            return -EINVAL;
3043        }
3044        val = uval;
3045        gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_ioctl: FS_IOC_SETFLAGS: %llu\n",
3046                     (unsigned long long)val);
3047        ret = pvfs2_xattr_set_default(
3048#ifdef HAVE_XATTR_HANDLER_SET_SIX_PARAM
3049                file->f_dentry,
3050#else
3051                file->f_dentry->d_inode,
3052#endif /* HAVE_XATTR_HANDLER_SET_SIX_PARAM */
3053                "user.pvfs2.meta_hint",
3054                &val,
3055                sizeof(val),
3056                0
3057#ifdef HAVE_XATTR_HANDLER_SET_SIX_PARAM
3058                , 0                                     
3059#endif /* HAVE_XATTR_HANDLER_SET_SIX_PARAM */
3060                );
3061    }
3062
3063    return ret;
3064}
3065#endif
3066
3067/** Memory map a region of a file.
3068 */
3069static int pvfs2_file_mmap(struct file *file, struct vm_area_struct *vma)
3070{
3071    struct inode *inode = file->f_dentry->d_inode;
3072
3073    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_mmap: called on %s\n",
3074                (file ? (char *)file->f_dentry->d_name.name :
3075                 (char *)"Unknown"));
3076
3077    /* we don't support mmap writes, or SHARED mmaps at all */
3078    if ((vma->vm_flags & VM_SHARED) || (vma->vm_flags & VM_MAYSHARE))
3079    {
3080        return -EINVAL;
3081    }
3082
3083    /*
3084      for mmap on pvfs2, make sure we use pvfs2 specific address
3085      operations by explcitly setting the operations
3086    */
3087    inode->i_mapping->host = inode;
3088    inode->i_mapping->a_ops = &pvfs2_address_operations;
3089
3090    /* set the sequential readahead hint */
3091    vma->vm_flags |= VM_SEQ_READ;
3092    vma->vm_flags &= ~VM_RAND_READ;
3093
3094    /* have the kernel enforce readonly mmap support for us */
3095#ifdef PVFS2_LINUX_KERNEL_2_4
3096    vma->vm_flags &= ~VM_MAYWRITE;
3097    return generic_file_mmap(file, vma);
3098#else
3099    /* backing_dev_info isn't present on 2.4.x */
3100    inode->i_mapping->backing_dev_info = &pvfs2_backing_dev_info;
3101    return generic_file_readonly_mmap(file, vma);
3102#endif
3103}
3104
3105#ifndef HAVE_MAPPING_NRPAGES_MACRO
3106#define mapping_nrpages(idata) (idata)->nrpages
3107#endif
3108
3109/** Called to notify the module that there are no more references to
3110 *  this file (i.e. no processes have it open).
3111 *
3112 *  \note Not called when each file is closed.
3113 */
3114int pvfs2_file_release(
3115    struct inode *inode,
3116    struct file *file)
3117{
3118    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_release: called on %s\n",
3119                file->f_dentry->d_name.name);
3120
3121    pvfs2_flush_inode(inode);
3122    if (S_ISDIR(inode->i_mode))
3123    {
3124        return dcache_dir_close(inode, file);
3125    }
3126
3127    /*
3128      remove all associated inode pages from the page cache and mmap
3129      readahead cache (if any); this forces an expensive refresh of
3130      data for the next caller of mmap (or 'get_block' accesses)
3131    */
3132    if (file->f_dentry->d_inode &&
3133        file->f_dentry->d_inode->i_mapping &&
3134        mapping_nrpages(&file->f_dentry->d_inode->i_data))
3135    {
3136        clear_inode_mmap_ra_cache(file->f_dentry->d_inode);
3137        truncate_inode_pages(file->f_dentry->d_inode->i_mapping, 0);
3138    }
3139    return 0;
3140}
3141
3142/** Push all data for a specific file onto permanent storage.
3143 */
3144int pvfs2_fsync(
3145    struct file *file,
3146#ifdef HAVE_FSYNC_DENTRY_PARAM
3147    struct dentry *dentry,
3148#endif
3149    int datasync)
3150{
3151    int ret = -EINVAL;
3152    pvfs2_inode_t *pvfs2_inode = PVFS2_I(file->f_dentry->d_inode);
3153    pvfs2_kernel_op_t *new_op = NULL;
3154
3155    new_op = op_alloc(PVFS2_VFS_OP_FSYNC);
3156    if (!new_op)
3157    {
3158        return -ENOMEM;
3159    }
3160    new_op->upcall.req.fsync.refn = pvfs2_inode->refn;
3161
3162    ret = service_operation(new_op, "pvfs2_fsync",
3163            get_interruptible_flag(file->f_dentry->d_inode));
3164
3165    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_fsync got return value of %d\n",ret);
3166
3167    op_release(new_op);
3168
3169    pvfs2_flush_inode(file->f_dentry->d_inode);
3170    return ret;
3171}
3172
3173/** Change the file pointer position for an instance of an open file.
3174 *
3175 *  \note If .llseek is overriden, we must acquire lock as described in
3176 *        Documentation/filesystems/Locking.
3177 */
3178loff_t pvfs2_file_llseek(struct file *file, loff_t offset, int origin)
3179{
3180    int ret = -EINVAL;
3181    struct inode *inode = file->f_dentry->d_inode;
3182
3183    if (!inode)
3184    {
3185        gossip_err("pvfs2_file_llseek: invalid inode (NULL)\n");
3186        return ret;
3187    }
3188
3189    if (origin == PVFS2_SEEK_END)
3190    {
3191        /* revalidate the inode's file size.
3192         * NOTE: We are only interested in file size here, so we set mask accordingly
3193         */
3194        ret = pvfs2_inode_getattr(inode, PVFS_ATTR_SYS_SIZE);
3195        if (ret)
3196        {
3197            gossip_debug(GOSSIP_FILE_DEBUG, "%s:%s:%d calling make bad inode\n", __FILE__,  __func__, __LINE__);
3198            pvfs2_make_bad_inode(inode);
3199            return ret;
3200        }
3201    }
3202
3203    gossip_debug(GOSSIP_FILE_DEBUG, "pvfs2_file_llseek: offset is %ld | origin is %d | "
3204                "inode size is %lu\n", (long)offset, origin,
3205                (unsigned long)file->f_dentry->d_inode->i_size);
3206
3207    return generic_file_llseek(file, offset, origin);
3208}
3209
3210/*
3211 * Apache uses the sendfile system call to stuff page-sized file data to
3212 * a socket. Unfortunately, the generic_sendfile function exported by
3213 * the kernel uses the page-cache and does I/O in pagesize granularities
3214 * and this leads to undesirable consistency problems not to mention performance
3215 * limitations.
3216 * Consequently, we chose to override the default callback by bypassing the page-cache.
3217 * Although, we could read larger than page-sized buffers from the file,
3218 * the actor routine does not know how to handle > 1 page buffer at a time.
3219 * So we still end up breaking things down. darn...
3220 */
3221#ifdef HAVE_SENDFILE_VFS_SUPPORT
3222
3223static void do_bypass_page_cache_read(struct file *filp, loff_t *ppos,
3224        read_descriptor_t *desc, read_actor_t actor)
3225{
3226    struct inode *inode = NULL;
3227    struct address_space *mapping = NULL;
3228    struct page *uncached_page = NULL;
3229    unsigned long kaddr = 0;
3230    unsigned long offset;
3231    loff_t isize;
3232    unsigned long begin_index, end_index;
3233    long prev_index;
3234    int to_free = 0;
3235
3236    mapping = filp->f_mapping;
3237    inode   = mapping->host;
3238    /* offset in file in terms of page_cache_size */
3239    begin_index = *ppos >> PAGE_CACHE_SHIFT;
3240    offset = *ppos & ~PAGE_CACHE_MASK;
3241
3242    isize = pvfs2_i_size_read(inode);
3243    if (!isize)
3244    {
3245        return;
3246    }
3247    end_index = (isize - 1) >> PAGE_CACHE_SHIFT;
3248    prev_index = -1;
3249    /* copy page-sized units at a time using the actor routine */
3250    for (;;)
3251    {
3252        unsigned long nr, ret, error;
3253
3254        /* Are we reading beyond what exists */
3255        if (begin_index > end_index)
3256        {
3257            break;
3258        }
3259        /* issue a file-system read call to fill this buffer which is in kernel space */
3260        if (prev_index != begin_index)
3261        {
3262            loff_t file_offset;
3263            file_offset = (begin_index << PAGE_CACHE_SHIFT);
3264            /* Allocate a page, but don't add it to the pagecache proper */
3265            kaddr = __get_free_page(mapping_gfp_mask(mapping));
3266            if (kaddr == 0UL)
3267            {
3268                desc->error = -ENOMEM;
3269                break;
3270            }
3271            to_free = 1;
3272            uncached_page = virt_to_page(kaddr);
3273            gossip_debug(GOSSIP_FILE_DEBUG, "begin_index = %lu offset = %lu file_offset = %ld\n",
3274                    (unsigned long) begin_index, (unsigned long) offset, (unsigned long)file_offset);
3275
3276            error = pvfs2_inode_read(inode, (void *) kaddr, PAGE_CACHE_SIZE, &file_offset, 0, 0);
3277            prev_index = begin_index;
3278        }
3279        else {
3280            error = 0;
3281        }
3282        /*
3283         * In the unlikely event of an error, bail out
3284         */
3285        if (unlikely(error < 0))
3286        {
3287            desc->error = error;
3288            break;
3289        }
3290        /* nr is the maximum amount of bytes to be copied from this page */
3291        nr = PAGE_CACHE_SIZE;
3292        if (begin_index >= end_index)
3293        {
3294            if (begin_index > end_index)
3295            {
3296                break;
3297            }
3298            /* Adjust the number of bytes on the last page */
3299            nr = ((isize - 1) & ~PAGE_CACHE_MASK) + 1;
3300            /* Do we have fewer valid bytes in the file than what was requested? */
3301            if (nr <= offset)
3302            {
3303                break;
3304            }
3305        }
3306        nr = nr - offset;
3307
3308        ret = actor(desc, uncached_page, offset, nr);
3309        gossip_debug(GOSSIP_FILE_DEBUG, "actor with offset %lu nr %lu return %lu desc->count %lu\n",
3310                (unsigned long) offset, (unsigned long) nr, (unsigned long) ret, (unsigned long) desc->count);
3311
3312        offset += ret;
3313        begin_index += (offset >> PAGE_CACHE_SHIFT);
3314        offset &= ~PAGE_CACHE_MASK;
3315        if (to_free == 1)
3316        {
3317            free_page(kaddr);
3318            to_free = 0;
3319        }
3320        if (ret == nr && desc->count)
3321            continue;
3322        break;
3323    }
3324    if (to_free == 1)
3325    {
3326        free_page(kaddr);
3327        to_free = 0;
3328    }
3329    *ppos = (begin_index << PAGE_CACHE_SHIFT) + offset;
3330    file_accessed(filp);
3331    return;
3332}
3333
3334static ssize_t pvfs2_sendfile(struct file *filp, loff_t *ppos,
3335        size_t count, read_actor_t actor, void *target)
3336{
3337    int error;
3338    read_descriptor_t desc;
3339
3340    desc.written = 0;
3341    desc.count = count;
3342#ifdef HAVE_ARG_IN_READ_DESCRIPTOR_T
3343    desc.arg.data = target;
3344#else
3345    desc.buf = target;
3346#endif
3347    desc.error = 0;
3348
3349    /*
3350     * Revalidate the inode so that i_size_read will
3351     * return the appropriate size
3352     */
3353    if ((error = pvfs2_inode_getattr(filp->f_mapping->host, PVFS_ATTR_SYS_SIZE)) < 0)
3354    {
3355        return error;
3356    }
3357
3358    /* Do a blocking read from the file and invoke the actor appropriately */
3359    do_bypass_page_cache_read(filp, ppos, &desc, actor);
3360    if (desc.written)
3361        return desc.written;
3362    return desc.error;
3363}
3364
3365#endif
3366
3367int pvfs2_lock(struct file *f, int flags, struct file_lock *lock)
3368{
3369    return -ENOSYS;
3370}
3371
3372/** PVFS2 implementation of VFS file operations */
3373struct file_operations pvfs2_file_operations =
3374{
3375#ifdef PVFS2_LINUX_KERNEL_2_4
3376    llseek : pvfs2_file_llseek,
3377    read : pvfs2_file_read,
3378    write : pvfs2_file_write,
3379    readv : pvfs2_file_readv,
3380    writev : pvfs2_file_writev,
3381    ioctl : pvfs2_ioctl,
3382    mmap : pvfs2_file_mmap,
3383    open : pvfs2_file_open,
3384    release : pvfs2_file_release,
3385    fsync : pvfs2_fsync
3386#else
3387    .llseek = pvfs2_file_llseek,
3388    .read = pvfs2_file_read,
3389    .write = pvfs2_file_write,
3390#ifdef HAVE_COMBINED_AIO_AND_VECTOR
3391    /* for >= 2.6.19 */
3392#ifdef HAVE_AIO_VFS_SUPPORT
3393    .aio_read = pvfs2_file_aio_read_iovec,
3394    .aio_write = pvfs2_file_aio_write_iovec,
3395#endif
3396    .lock = pvfs2_lock,
3397#else
3398    .readv = pvfs2_file_readv,
3399    .writev = pvfs2_file_writev,
3400#  ifdef HAVE_AIO_VFS_SUPPORT
3401    .aio_read = pvfs2_file_aio_read,
3402    .aio_write = pvfs2_file_aio_write,
3403#  endif
3404#endif
3405#ifdef HAVE_UNLOCKED_IOCTL_HANDLER
3406    .unlocked_ioctl = pvfs2_ioctl,
3407#else
3408    .ioctl = pvfs2_ioctl,
3409#endif /* HAVE_UNLOCKED_IOCTL_HANDLER */
3410    .mmap = pvfs2_file_mmap,
3411    .open = pvfs2_file_open,
3412    .release = pvfs2_file_release,
3413    .fsync = pvfs2_fsync,
3414#ifdef HAVE_SENDFILE_VFS_SUPPORT
3415    .sendfile = pvfs2_sendfile,
3416#endif
3417#ifdef HAVE_READX_FILE_OPERATIONS
3418    .readx = pvfs2_file_readx,
3419#endif
3420#ifdef HAVE_WRITEX_FILE_OPERATIONS
3421    .writex = pvfs2_file_writex,
3422#endif
3423    .lock = pvfs2_lock,
3424#endif
3425};
3426
3427#ifdef PVFS2_LINUX_KERNEL_2_4
3428/*
3429 * pvfs2_precheck_file_write():
3430 * Check the conditions on a file descriptor prior to beginning a write
3431 * on it.  Contains the common precheck code for both buffered and direct
3432 * IO.
3433 *
3434 * NOTE: this function is a modified version of precheck_file_write() from
3435 * 2.4.x.  precheck_file_write() is not exported so we are forced to
3436 * duplicate it here.
3437 */
3438static int pvfs2_precheck_file_write(struct file *file, struct inode *inode,
3439    size_t *count, loff_t *ppos)
3440{
3441    ssize_t       err;
3442    unsigned long limit = current->rlim[RLIMIT_FSIZE].rlim_cur;
3443    loff_t        pos = *ppos;
3444   
3445    err = -EINVAL;
3446    if (pos < 0)
3447        goto out;
3448
3449    err = file->f_error;
3450    if (err) {
3451        file->f_error = 0;
3452        goto out;
3453    }
3454
3455    /* FIXME: this is for backwards compatibility with 2.4 */
3456    if (!S_ISBLK(inode->i_mode) && (file->f_flags & O_APPEND))
3457        *ppos = pos = inode->i_size;
3458
3459    /*
3460     * Check whether we've reached the file size limit.
3461     */
3462    err = -EFBIG;
3463   
3464    if (!S_ISBLK(inode->i_mode) && limit != RLIM_INFINITY) {
3465        if (pos >= limit) {
3466            send_sig(SIGXFSZ, current, 0);
3467            goto out;
3468        }
3469        if (pos > 0xFFFFFFFFULL || *count > limit - (u32)pos) {
3470            /* send_sig(SIGXFSZ, current, 0); */
3471            *count = limit - (u32)pos;
3472        }
3473    }
3474
3475    /*
3476     *    LFS rule
3477     */
3478    if ( pos + *count > MAX_NON_LFS && !(file->f_flags&O_LARGEFILE)) {
3479        if (pos >= MAX_NON_LFS) {
3480            send_sig(SIGXFSZ, current, 0);
3481            goto out;
3482        }
3483        if (*count > MAX_NON_LFS - (u32)pos) {
3484            /* send_sig(SIGXFSZ, current, 0); */
3485            *count = MAX_NON_LFS - (u32)pos;
3486        }
3487    }
3488
3489    /*
3490     *    Are we about to exceed the fs block limit ?
3491     *
3492     *    If we have written data it becomes a short write
3493     *    If we have exceeded without writing data we send
3494     *    a signal and give them an EFBIG.
3495     *
3496     *    Linus frestrict idea will clean these up nicely..
3497     */
3498     
3499    if (!S_ISBLK(inode->i_mode)) {
3500        if (pos >= inode->i_sb->s_maxbytes)
3501        {
3502            if (*count || pos > inode->i_sb->s_maxbytes) {
3503                send_sig(SIGXFSZ, current, 0);
3504                err = -EFBIG;
3505                goto out;
3506            }
3507            /* zero-length writes at ->s_maxbytes are OK */
3508        }
3509
3510        if (pos + *count > inode->i_sb->s_maxbytes)
3511            *count = inode->i_sb->s_maxbytes - pos;
3512    } else {
3513        if (is_read_only(inode->i_rdev)) {
3514            err = -EPERM;
3515            gossip_err("Operation not permitted on read only file system\n");
3516            goto out;
3517        }
3518        if (pos >= inode->i_size) {
3519            if (*count || pos > inode->i_size) {
3520                err = -ENOSPC;
3521                goto out;
3522            }
3523        }
3524
3525        if (pos + *count > inode->i_size)
3526            *count = inode->i_size - pos;
3527    }
3528
3529    err = 0;
3530out:
3531    return err;
3532}
3533#endif
3534
3535/*
3536 * Local variables:
3537 *  c-indent-level: 4
3538 *  c-basic-offset: 4
3539 * End:
3540 *
3541 * vim: ts=8 sts=4 sw=4 expandtab
3542 */
Note: See TracBrowser for help on using the browser.