root/branches/Orange-Branch/src/server/lookup.sm @ 8317

Revision 8317, 19.4 KB (checked in by bligon, 3 years ago)

merge of B2O-Blue-Sync,HEAD,and latest of Orange.

Line 
1/*
2 * (C) 2001 Clemson University and The University of Chicago
3 *
4 * See COPYING in top-level directory.
5 */
6
7#include <string.h>
8#include <stddef.h>
9#include <assert.h>
10#include <stdio.h>
11#include <stdlib.h>
12#ifdef HAVE_MALLOC_H
13#include <malloc.h>
14#endif
15
16#include "server-config.h"
17#include "pvfs2-server.h"
18#include "pvfs2-attr.h"
19#include "str-utils.h"
20#include "pint-util.h"
21#include "pvfs2-internal.h"
22
23enum
24{
25    STATE_ENOTDIR = 22,
26    STATE_NOMORESEGS = 23,
27    LOOKUP_CHECK_DIR_ACLS = 24,
28};
29
30%%
31
32machine pvfs2_lookup_sm
33{
34    state prelude
35    {
36        jump pvfs2_prelude_sm;
37        success => init;
38        default => final_response;
39    }
40
41    state init
42    {
43        run lookup_init;
44        STATE_ENOTDIR => setup_resp;
45        default => read_object_metadata;
46    }
47
48    state read_object_metadata
49    {
50        run lookup_read_object_metadata;
51        success => verify_object_metadata;
52        default => setup_resp;
53    }
54
55    state verify_object_metadata
56    {
57        run lookup_verify_object_metadata;
58        LOOKUP_CHECK_DIR_ACLS => read_directory_acls;
59        success => read_directory_entry_handle;
60        default => setup_resp;
61    }
62
63    state read_directory_acls
64    {
65        run lookup_check_acls_if_needed;
66        default => check_acls;
67    }
68
69    state check_acls
70    {
71        run lookup_check_acls;
72        success => read_directory_entry_handle;
73        default => setup_resp;
74    }
75
76    state read_directory_entry_handle
77    {
78        run lookup_read_directory_entry_handle;
79        success => read_directory_entry;
80        default => setup_resp;
81    }
82
83    state read_directory_entry
84    {
85        run lookup_read_directory_entry;
86        success => read_object_metadata;
87        default => setup_resp;
88    }
89   
90    state setup_resp
91    {
92        run lookup_setup_resp;
93        default => final_response;
94    }
95
96    state final_response
97    {
98        jump pvfs2_final_response_sm;
99        default => cleanup;
100    }
101
102    state cleanup
103    {
104        run lookup_cleanup;
105        default => terminate;
106    }
107}
108
109%%
110
111/*
112 * Function: lookup_init
113 *
114 * Synopsis: initializes internal structures and posts job to request
115 * scheduler.
116 *
117 * Assumes req structure holds a valid path.
118 *
119 * Initializes segp, seg_ct, seg_nr fields in s_op->u.lookup.
120 *
121 * Allocates memory for handle and attribute arrays that will be
122 * returned in the response.
123 *
124 * Note: memory is allocated as one big chunk, pointed to by
125 * s_op->resp.u.lookup_path.handle_array.
126 *
127 */
128static PINT_sm_action lookup_init(
129        struct PINT_smcb *smcb, job_status_s *js_p)
130{
131    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
132    char *ptr = NULL;
133
134
135    /* fill in the lookup portion of the PINT_server_op */
136    s_op->u.lookup.segp = NULL;
137    s_op->u.lookup.seg_nr = 0;
138    s_op->u.lookup.seg_ct = PINT_string_count_segments(
139        s_op->req->u.lookup_path.path);
140    s_op->u.lookup.handle_ct = 0;
141    s_op->u.lookup.attr_ct = 0;
142
143    gossip_debug(GOSSIP_SERVER_DEBUG, " STARTING LOOKUP REQUEST "
144                     "(path:%s)(fs_id:%d)(handle:%llu)(attrmask:%u)"
145                     "(# of segments:%u)\n",
146                     s_op->req->u.lookup_path.path,
147                     s_op->req->u.lookup_path.fs_id,
148                     llu(s_op->req->u.lookup_path.handle),
149                     s_op->req->u.lookup_path.attrmask,
150                     s_op->u.lookup.seg_ct);
151    if ((s_op->u.lookup.seg_ct < 0) ||
152        (s_op->u.lookup.seg_ct > PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT))
153    {
154        gossip_err("  invalid path %s (bad segment count); "
155                   "sending error response\n",
156                   s_op->req->u.lookup_path.path);
157        js_p->error_code = -PVFS_ENOTDIR;
158        return SM_ACTION_DEFERRED;
159    }
160
161    /* allocate the internal ds_attr_array */
162    s_op->u.lookup.ds_attr_array = (PVFS_ds_attributes *)
163        malloc(s_op->u.lookup.seg_ct * sizeof(PVFS_ds_attributes));
164    if(!s_op->u.lookup.ds_attr_array)
165    {
166        js_p->error_code = -PVFS_ENOMEM;
167        return 1;
168    }
169
170    /* allocate memory
171     *
172     * Note: all memory is allocated in a single block,
173     * pointed to by s_op->resp.u.lookup_path.handle_array
174     */
175    ptr = malloc(s_op->u.lookup.seg_ct *
176                 (sizeof(PVFS_handle) + sizeof(PVFS_object_attr)));
177    if (!ptr)
178    {
179        js_p->error_code = -PVFS_ENOMEM;
180        return SM_ACTION_COMPLETE;
181    }
182
183    s_op->resp.u.lookup_path.handle_array = (PVFS_handle *)ptr;
184    ptr += (s_op->u.lookup.seg_ct * sizeof(PVFS_handle));
185
186    s_op->resp.u.lookup_path.attr_array = (PVFS_object_attr *)ptr;
187
188    js_p->error_code = 0;
189    return SM_ACTION_COMPLETE;
190}
191
192/*
193 * Function: lookup_read_object_metadata
194 *
195 * Synopsis: Given an object handle, looks up the attributes
196 * (metadata) for that handle.
197 *
198 * Initializes key and value structures to direct metadata:
199 * - if this is the starting (base) handle, store in
200 *   s_op->u.lookup.base_attr
201 * - otherwise store it in the appropriate slot in the resp handle array
202 *
203 * Posts the keyval read to trove.
204 */
205static PINT_sm_action lookup_read_object_metadata(
206        struct PINT_smcb *smcb, job_status_s *js_p)
207{
208    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
209    int ret = -PVFS_EINVAL;
210    job_id_t j_id;
211    PVFS_handle handle = PVFS_HANDLE_NULL;
212    PVFS_ds_attributes *ds_attr = NULL;
213
214    assert(s_op->u.lookup.seg_nr <= s_op->u.lookup.seg_ct);
215
216    /* use the base handle if we haven't looked up a segment yet */
217    if (s_op->u.lookup.seg_nr == 0)
218    {
219        handle = s_op->req->u.lookup_path.handle;
220        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
221    }
222    else
223    {
224        handle = s_op->resp.u.lookup_path.handle_array[
225            s_op->u.lookup.seg_nr-1];
226        ds_attr = &(s_op->u.lookup.ds_attr_array[
227                        s_op->u.lookup.seg_nr - 1]);
228    }
229
230    /* update our successful handle read count */
231    s_op->u.lookup.handle_ct++;
232
233    /* Copy the fsid and handle to the s_op structure for the acl check */
234    s_op->target_handle = handle;
235    s_op->target_fs_id = s_op->req->u.lookup_path.fs_id;
236
237    /* get the dspace attributes/metadata */
238    ret = job_trove_dspace_getattr(
239        s_op->req->u.lookup_path.fs_id, handle, smcb, ds_attr,
240        0, js_p, &j_id, server_job_context, s_op->req->hints );
241
242    return ret;
243}
244
245/*
246 * Function: lookup_verify_object_metadata
247 *
248 * Synopsis: Examine the metadata returned from the prelude sm.  If
249 * the metadata is for a directory, prepare to read the handle of the
250 * next segment, if there is one.  If the metadata is for a file,
251 * prepare to send a response.
252 *
253 * If the object is a directory, this function sets the
254 * s_op->u.lookup.segp value to point to the next segment to look up;
255 * this is used in lookup_read_directory_entry.
256 *
257 * This function does not post an operation, but rather returns 1
258 * immediately.
259 */
260static PINT_sm_action lookup_verify_object_metadata(
261        struct PINT_smcb *smcb, job_status_s *js_p)
262{
263    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
264    int ret = -PVFS_EINVAL;
265    PVFS_object_attr *a_p = NULL;
266    PVFS_ds_attributes *ds_attr = NULL;
267
268    if (s_op->u.lookup.seg_nr == 0)
269    {
270        a_p = &s_op->attr;
271        ds_attr = &(s_op->u.lookup.ds_attr_array[0]);
272    }
273    else
274    {
275        a_p = &s_op->resp.u.lookup_path.attr_array[
276            s_op->u.lookup.seg_nr - 1];
277        ds_attr = &(s_op->u.lookup.ds_attr_array[
278                        s_op->u.lookup.seg_nr - 1]);
279    }
280
281    PVFS_ds_attr_to_object_attr(ds_attr, a_p);
282    a_p->mask = PVFS_ATTR_COMMON_ALL;
283    s_op->target_object_attr = a_p;
284
285    /* update our successful attr read count */
286    s_op->u.lookup.attr_ct++;
287
288    assert(((a_p->objtype == PVFS_TYPE_DIRECTORY) ||
289            (a_p->objtype == PVFS_TYPE_METAFILE)  ||
290            (a_p->objtype == PVFS_TYPE_SYMLINK)));
291
292    gossip_debug(
293        GOSSIP_SERVER_DEBUG, "  attrs = (owner = %d, group = %d, "
294        "perms = %o, type = %d)\n", a_p->owner, a_p->group,
295        a_p->perms, a_p->objtype);
296
297    /* if we hit a metafile, we are done */
298    if (a_p->objtype == PVFS_TYPE_METAFILE)
299    {
300        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a metafile; "
301                     "halting lookup and sending response\n");
302
303        js_p->error_code = STATE_ENOTDIR;
304        return SM_ACTION_COMPLETE;
305    }
306
307    /*
308      if we hit a symlink, we're done; client will pick up the pieces
309      and continue to resolve the symlink if required
310    */
311    if (a_p->objtype == PVFS_TYPE_SYMLINK)
312    {
313        gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a symlink; "
314                     "halting lookup and sending response\n");
315
316        js_p->error_code = STATE_ENOTDIR;
317        return SM_ACTION_COMPLETE;
318    }
319
320    /* if we looked up all the segments, we are done */
321    if (s_op->u.lookup.seg_nr == s_op->u.lookup.seg_ct)
322    {
323        gossip_debug(GOSSIP_SERVER_DEBUG, "  no more segments in path; "
324                     "sending response\n");
325
326        js_p->error_code = STATE_NOMORESEGS;
327        return SM_ACTION_COMPLETE;
328    }
329
330    /* if we reach this point, the object is a directory.  Verify that we
331     * have execute permission on the directory before continuing traversal
332     */
333    js_p->error_code = PINT_check_mode(a_p, s_op->req->credentials.uid,
334        s_op->req->credentials.gid, PINT_ACCESS_EXECUTABLE);
335    if(js_p->error_code != 0)
336    {
337        /* doesn't look like we have permission to traverse directory; bail
338         * out
339         */
340        js_p->error_code = LOOKUP_CHECK_DIR_ACLS;
341        return SM_ACTION_COMPLETE;
342    }
343
344    /* find the segment that we should look up in the directory */
345    ret = PINT_string_next_segment(
346        s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
347        &s_op->u.lookup.segstate);
348
349    if(ret != 0)
350    {
351        gossip_err("PINT_string_next_segment failed: path: %s\n",
352                   s_op->req->u.lookup_path.path);
353    }
354    assert(ret == 0);
355
356    gossip_debug(GOSSIP_SERVER_DEBUG, "  object is a directory; will be "
357                 "looking for handle for segment \"%s\" in a bit\n",
358                 s_op->u.lookup.segp);
359
360
361    return SM_ACTION_COMPLETE;
362}
363
364/*
365 * Post a keyval DB read of the posix acls to check and see if
366 * directory traversal is allowed or not
367 */
368static PINT_sm_action lookup_check_acls_if_needed(
369    struct PINT_smcb *smcb, job_status_s* js_p)
370{
371    int ret = -PVFS_EINVAL;
372    job_id_t i;
373    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
374
375    /* If we get here with an invalid fsid and handle, we have to
376     * return -PVFS_EACCESS
377     */
378    if (s_op->target_fs_id == PVFS_FS_ID_NULL
379        || s_op->target_handle == PVFS_HANDLE_NULL)
380    {
381        js_p->error_code = -PVFS_EACCES;
382        return SM_ACTION_COMPLETE;
383    }
384    js_p->error_code = 0;
385
386    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
387    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
388    s_op->key.buffer = "system.posix_acl_access";
389    s_op->key.buffer_sz = strlen(s_op->key.buffer) + 1;
390    s_op->val.buffer = (char *) malloc(PVFS_REQ_LIMIT_VAL_LEN);
391    if (!s_op->val.buffer)
392    {
393        js_p->error_code = -PVFS_ENOMEM;
394        return SM_ACTION_COMPLETE;
395    }
396    s_op->val.buffer_sz = PVFS_REQ_LIMIT_VAL_LEN;
397
398    gossip_debug(GOSSIP_PERMISSIONS_DEBUG, "About to retrieve acl keyvals "
399                 "for handle %llu\n", llu(s_op->target_handle));
400
401    /* Read acl keys */
402    ret = job_trove_keyval_read(
403        s_op->target_fs_id,
404        s_op->target_handle,
405        &s_op->key,
406        &s_op->val,
407        0,
408        NULL,
409        s_op,
410        0,
411        js_p,
412        &i,
413        server_job_context, s_op->req->hints);
414    return ret;
415}
416
417/*
418 * Verify if the completed keyval DB operation allows the lookup
419 * to proceed or not. i.e. executable privileges on directory
420 * for the requesting user or not.
421 */
422static PINT_sm_action lookup_check_acls(
423    struct PINT_smcb *smcb, job_status_s* js_p)
424{
425    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
426    PVFS_object_attr *obj_attr = NULL;
427    int want = PVFS2_ACL_EXECUTE;
428
429    /* The dspace attr must have been read at this point */
430    obj_attr = s_op->target_object_attr;
431    assert(obj_attr);
432
433    /* anything non-zero we treat as a real error */
434    if (js_p->error_code)
435    {
436        goto cleanup;
437    }
438    /* ok; let the actual acl check be done */
439    js_p->error_code = PINT_check_acls(s_op->val.buffer,
440                        s_op->val.read_sz,
441                        obj_attr,
442                        s_op->req->credentials.uid,
443                        s_op->req->credentials.gid,
444                        want);
445    /* if we are good to go,
446       find the segment that we should look up in the directory */
447    if (js_p->error_code == 0)
448    {
449        js_p->error_code = PINT_string_next_segment(
450            s_op->req->u.lookup_path.path, &s_op->u.lookup.segp,
451            &s_op->u.lookup.segstate);
452
453        if(js_p->error_code != 0)
454        {
455            gossip_err("PINT_string_next_segment failed to get the"
456                       "next segment to lookup from the path: %s\n",
457                       s_op->req->u.lookup_path.path);
458        }
459        else
460        {
461            gossip_debug(GOSSIP_SERVER_DEBUG, "  after ACL check "
462                         "object is a directory; will be "
463                         "looking for handle for segment \"%s\" in a bit\n",
464                         s_op->u.lookup.segp);
465        }
466    }
467cleanup:
468    if (s_op->val.buffer)
469        free(s_op->val.buffer);
470    memset(&s_op->key, 0, sizeof(PVFS_ds_keyval));
471    memset(&s_op->val, 0, sizeof(PVFS_ds_keyval));
472    return SM_ACTION_COMPLETE;
473}
474
475/*
476 * Function: lookup_read_directory_entry_handle
477 *
478 * Synopsis: Given a directory handle, look up the handle used to
479 * store directory entries for this directory.
480 *
481 * Initializes key and value structures to direct handle into
482 * s_op->u.lookup.dirent_handle, which is where we always store the
483 * handle used to read directory entries.  The handle to use for the
484 * read is either:
485 * - the starting handle from the req (if we haven't looked up a
486 *   segment yet), or
487 * - the previous segment's handle (from response handle array).
488 *
489 * Posts the keyval read to trove.
490 */
491static PINT_sm_action lookup_read_directory_entry_handle(
492        struct PINT_smcb *smcb, job_status_s *js_p)
493{
494    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
495    int ret = -PVFS_EINVAL;
496    PVFS_handle handle = PVFS_HANDLE_NULL;
497    job_id_t j_id;
498
499    /* use the base handle if we haven't looked up a segment yet */
500    if (s_op->u.lookup.seg_nr == 0)
501    {
502        handle = s_op->req->u.lookup_path.handle;
503    }
504    else
505    {
506        handle = s_op->resp.u.lookup_path.handle_array[
507            s_op->u.lookup.seg_nr-1];
508    }
509
510    gossip_debug(GOSSIP_SERVER_DEBUG,
511                 "  reading dirent handle value from handle %llu\n",
512                 llu(handle));
513
514    s_op->key.buffer = Trove_Common_Keys[DIR_ENT_KEY].key;
515    s_op->key.buffer_sz = Trove_Common_Keys[DIR_ENT_KEY].size;
516    s_op->val.buffer = &s_op->u.lookup.dirent_handle;
517    s_op->val.buffer_sz = sizeof(PVFS_handle);
518
519    ret = job_trove_keyval_read(
520        s_op->req->u.lookup_path.fs_id, handle, &s_op->key, &s_op->val,
521        0,
522        NULL, smcb, 0, js_p, &j_id, server_job_context, s_op->req->hints);
523
524    return ret;
525}
526
527/*
528 * Function: lookup_read_directory_entry
529 *
530 * Synopsis: Given a handle for a dspace holding directory entries,
531 * look up the current segment and obtain its handle.
532 */
533static PINT_sm_action lookup_read_directory_entry(
534        struct PINT_smcb *smcb, job_status_s *js_p)
535{
536    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
537    int ret = -PVFS_EINVAL;
538    job_id_t j_id;
539
540    gossip_debug(
541        GOSSIP_SERVER_DEBUG, "  reading from dirent handle = "
542        "%llu, segment = %s (len=%d)\n", llu(s_op->u.lookup.dirent_handle),
543        s_op->u.lookup.segp, (int) strlen(s_op->u.lookup.segp));
544
545    s_op->key.buffer = s_op->u.lookup.segp;
546    s_op->key.buffer_sz = strlen(s_op->u.lookup.segp) + 1;
547    s_op->val.buffer =
548        &s_op->resp.u.lookup_path.handle_array[s_op->u.lookup.seg_nr];
549    s_op->val.buffer_sz = sizeof(PVFS_handle);
550
551    /*
552      NOTE: if this operation fails, seg_nr will indicate one too many
553      valid segments; this is addressed in lookup_send_response.
554    */
555    s_op->u.lookup.seg_nr++;
556
557    ret = job_trove_keyval_read(
558        s_op->req->u.lookup_path.fs_id, s_op->u.lookup.dirent_handle,
559        &s_op->key, &s_op->val,
560        0,
561        NULL, smcb, 0, js_p, &j_id,
562        server_job_context, s_op->req->hints);
563
564    return ret;
565}
566
567
568static PINT_sm_action lookup_setup_resp(
569        struct PINT_smcb *smcb, job_status_s *js_p)
570{
571    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
572
573    /*
574      NOTE: we may have handle_count N with attr_count N-1 in the case
575      that another meta-server needs to continue with the path (attr)
576      lookup.  otherwise, we're returning N handles with N attrs.
577
578      # actually completed are one less than the respective counts
579    */
580    s_op->resp.u.lookup_path.handle_count = s_op->u.lookup.handle_ct - 1;
581    s_op->resp.u.lookup_path.attr_count = s_op->u.lookup.attr_ct - 1;
582
583    if (s_op->resp.u.lookup_path.handle_count ||
584        s_op->resp.u.lookup_path.attr_count)
585    {
586        js_p->error_code = 0;
587    }
588    else if(js_p->error_code < 0)
589    {
590        /* preserve error code in this case and fall through */
591        gossip_debug(GOSSIP_SERVER_DEBUG, "  lookup error in previous step\n");
592    }
593    else
594    {
595        js_p->error_code = -PVFS_ENOENT;
596    }
597
598    gossip_debug(GOSSIP_SERVER_DEBUG, "  sending '%s' response with %d "
599                 "handle(s) and %d attr(s)\n",
600                 (js_p->error_code ? "error" : "success"),
601                 s_op->resp.u.lookup_path.handle_count,
602                 s_op->resp.u.lookup_path.attr_count);
603
604    if(js_p->error_code == 0)
605    {
606        PINT_ACCESS_DEBUG(
607            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, handle: %llu\n",
608            s_op->req->u.lookup_path.path,
609            llu(s_op->resp.u.lookup_path.handle_array[
610                s_op->resp.u.lookup_path.handle_count-1]));
611    }
612    else
613    {
614        PINT_ACCESS_DEBUG(
615            s_op, GOSSIP_ACCESS_DEBUG, "path: %s, lookup failed\n",
616            s_op->req->u.lookup_path.path);
617    }
618
619    return SM_ACTION_COMPLETE;
620}
621
622/*
623 * Function: lookup_cleanup
624 *
625 * Synopsis: Free memory allocated during request processing.
626 *
627 * There are a bunch of regions that must be freed after processing
628 * completes:
629 * - decoded request (s_op->decoded)
630 * - encoded request (s_op->unexp_bmi_buff.buffer)
631 * - encoded response (s_op->encoded)
632 * - original (decoded) response (s_op->resp)
633 * - dynamically allocated space (in this case
634 *   s_op->resp.u.lookup_path.handle_array)
635 * - the server operation structure itself
636 */
637static PINT_sm_action lookup_cleanup(
638        struct PINT_smcb *smcb, job_status_s *js_p)
639{
640    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
641    if (s_op->resp.u.lookup_path.handle_array)
642    {
643        free(s_op->resp.u.lookup_path.handle_array);
644        s_op->resp.u.lookup_path.handle_array = NULL;
645    }
646
647    if (s_op->u.lookup.ds_attr_array)
648    {
649        free(s_op->u.lookup.ds_attr_array);
650        s_op->u.lookup.ds_attr_array = NULL;
651    }
652    return(server_state_machine_complete(smcb));
653}
654
655PINT_GET_OBJECT_REF_DEFINE(lookup_path);
656
657struct PINT_server_req_params pvfs2_lookup_params =
658{
659    .string_name = "lookup_path",
660    .perm = PINT_SERVER_CHECK_NONE,
661    .sched_policy = PINT_SERVER_REQ_SCHEDULE,
662    .get_object_ref = PINT_get_object_ref_lookup_path,
663    .state_machine = &pvfs2_lookup_sm
664};
665
666/*
667 * Local variables:
668 *  mode: c
669 *  c-indent-level: 4
670 *  c-basic-offset: 4
671 * End:
672 *
673 * vim: ft=c ts=8 sts=4 sw=4 expandtab
674 */
Note: See TracBrowser for help on using the browser.