Changeset 7827

Show
Ignore:
Timestamp:
06/17/09 12:17:31 (4 years ago)
Author:
sson
Message:

Updated the buffer copy method from the nested state machine to parent.

Location:
branches/as-branch/src/server
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • branches/as-branch/src/server/allreduce.sm

    r7823 r7827  
    140140    double *a = s_op->u.allreduce.send_buf; 
    141141    double *b = s_op->u.allreduce.recv_buf; 
    142     double tmp; 
    143     js_p->error_code = 0; 
    144142 
    145143    s_op->u.allreduce.current_depth++; 
     
    147145 
    148146    a[0] = a[0] + b[0]; 
    149     s_op->u.allreduce.send_buf = (void*)a; 
     147    memcpy(s_op->u.allreduce.send_buf, (void*)a, sizeof(double)); 
    150148 
    151149    gossip_debug(GOSSIP_IO_DEBUG, "received=%lf\n", *b); 
     
    155153    if(s_op->u.allreduce.current_depth < s_op->u.allreduce.tree_depth) 
    156154        js_p->error_code = CONTINUE_ALLREDUCE; 
     155    else 
     156        js_p->error_code = 0; 
    157157 
    158     gossip_debug(GOSSIP_IO_DEBUG, "%s: reduced result=%lf\n", __func__, 
    159                  *a); 
     158    gossip_debug(GOSSIP_IO_DEBUG, "%s: reduced result=%lf\n",  
     159                 __func__, *a); 
    160160 
    161161    return SM_ACTION_COMPLETE; 
  • branches/as-branch/src/server/io.sm

    r7826 r7827  
    200200    gossip_debug(GOSSIP_IO_DEBUG, "%s: tag=%d\n", __func__, s_op->tag); 
    201201    s_op->u.io.user_ptr = NULL; 
    202     s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */ 
    203     s_op->u.io.datatype = s_op->req->u.io.datatype; /* AS: dtype */ 
    204     s_op->u.io.dfile_array  = s_op->req->u.io.dfile_array; /* AD: dfile_array */ 
    205202     
    206203    gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct); 
     
    208205    for(i=0; i<s_op->req->u.io.server_ct; i++) { 
    209206        gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%llu\n", i,  
    210                      llu(s_op->u.io.dfile_array[i])); 
    211         if(s_op->u.io.dfile_array[i] == s_op->req->u.io.handle) { 
     207                     llu(s_op->req->u.io.dfile_array[i])); 
     208        if(s_op->req->u.io.dfile_array[i] == s_op->req->u.io.handle) { 
    212209            dfile_index = i; 
    213210            gossip_debug(GOSSIP_IO_DEBUG, "dfile_index=%d\n", i); 
     
    260257    if(!fs_config) { 
    261258        gossip_err("%s: Failed to get filesystem " 
    262                    "config from fs_id of: %d\n", __func__, 
     259d                  "config from fs_id of: %d\n", __func__, 
    263260                   s_op->u.pipeline.fs_id); 
    264261        js_p->error_code = -PVFS_EINVAL; 
     
    289286        memset(pipeline_op, 0, sizeof(*pipeline_op)); 
    290287 
     288        pipeline_op->u.pipeline.op = s_op->req->u.io.op; 
     289        pipeline_op->u.pipeline.datatype = s_op->req->u.io.datatype; 
     290 
    291291        pipeline_op->u.pipeline.address = s_op->addr; 
    292292        pipeline_op->u.pipeline.handle = s_op->req->u.io.handle; 
     
    295295        pipeline_op->u.pipeline.seg_handle = seg_handle; 
    296296        pipeline_op->u.pipeline.id = id[i]; 
    297         //pipeline_op->u.pipeline.parent = s_op; 
    298297        pipeline_op->u.pipeline.hints = s_op->req->hints; 
    299298        pipeline_op->u.pipeline.tag = s_op->tag; 
     
    304303        pipeline_op->u.pipeline.dfile_count = s_op->u.io.file_data.server_ct; 
    305304        pipeline_op->u.pipeline.dist = s_op->u.io.file_data.dist; 
     305        pipeline_op->u.pipeline.dfile_array = s_op->req->u.io.dfile_array; 
     306        pipeline_op->u.pipeline.file_data = s_op->u.io.file_data; 
     307        pipeline_op->u.pipeline.file_req_offset = s_op->u.io.file_req_offset; 
    306308 
    307309        /* figure out if the fs config has trove data sync turned on or off 
     
    382384        if(pipeline_op->u.pipeline.buffer) { 
    383385            if(s_op->req->u.io.io_type == PVFS_IO_READ) { 
     386                memcpy(s_op->u.io.tmp_buffer, pipeline_op->u.pipeline.buffer, sizeof(double)); 
    384387                BMI_memfree(pipeline_op->u.pipeline.address,  
    385388                            pipeline_op->u.pipeline.buffer,  
     
    575578            s_op->u.io.total_transferred; /* AS */ 
    576579 
    577     gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", __func__, 
    578     lld(s_op->u.io.total_transferred)); err = PINT_encode( 
     580    gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n",  
     581                 __func__, lld(s_op->u.io.total_transferred));  
     582    err = PINT_encode( 
    579583        &s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded), 
    580584        s_op->addr, s_op->decoded.enc_type); 
    581585 
    582     gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 0 
    583586    if (err < 0) 
    584587    { 
     
    597600    gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 1 
    598601 
    599     return err; 
     602    js_p->error_code = err; 
     603    return SM_ACTION_COMPLETE; 
    600604} 
    601605 
  • branches/as-branch/src/server/pipeline.sm

    r7826 r7827  
    3131#define DO_COMP 103 
    3232#define DO_ALLREDUCE 104 
    33 #define CONTINUE_ALLREDUCE 105 
    3433 
    3534static int s2s_comp_fn( 
     
    123122    job_id_t tmp_id; 
    124123 
    125     gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n", smcb->base_frame, smcb->frame_count); 
     124    gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n",  
     125                 smcb->base_frame, smcb->frame_count); 
    126126    s_op->u.pipeline.buffer_used = 0; 
    127127    bytes = s_op->u.pipeline.buffer_size; 
     
    220220    job_id_t tmp_id; 
    221221    struct server_configuration_s *user_opts = get_server_config_struct(); 
    222     struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0); 
    223222 
    224223    if(s_op->u.pipeline.segs == 0) { 
     
    240239        assert(s_op->u.pipeline.buffer_used); 
    241240 
    242         if(parent_s_op->u.io.op != 0) { /* AS: when op is specified */ 
     241        if(s_op->u.pipeline.op != 0) { /* AS: when op is specified */ 
    243242            ret = DO_COMP; /* AS: skip sending if op is specified */  
    244243            gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__); 
     
    309308{ 
    310309    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 
    311     struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0); 
    312310    js_p->error_code = 0; 
    313     PVFS_size file_req_offset = parent_s_op->u.io.file_req_offset; 
    314     PINT_request_file_data fdata = parent_s_op->u.io.file_data; 
     311    PVFS_size file_req_offset = s_op->u.pipeline.file_req_offset; 
     312    PINT_request_file_data fdata = s_op->u.pipeline.file_data; 
    315313    PVFS_simple_stripe_params *dparam =  
    316314        (PVFS_simple_stripe_params*)fdata.dist->params; 
     
    323321    gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, file_req_offset=%lld\n", lld(loff), lld(file_req_offset)); 
    324322 
    325     switch(parent_s_op->u.io.datatype) { 
     323    switch(s_op->u.pipeline.datatype) { 
    326324    case ((int)0x4c000405): /* MPI_INT */ 
    327325        count = (PVFS_size)(s_op->u.pipeline.buffer_used -  
     
    365363{ 
    366364    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 
    367     struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0); 
    368365    PINT_sm_msgpair_state *msg_p = NULL; 
    369366    struct server_configuration_s *user_opts = get_server_config_struct(); 
     
    388385    /* determine which server we need to talk to */ 
    389386    next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count); 
    390     next_server_handle = parent_s_op->u.io.dfile_array[next_server_index]; 
     387    next_server_handle = s_op->u.pipeline.dfile_array[next_server_index]; 
    391388 
    392389    /* build a request */ 
     
    511508                     "%s: buffer_used=%lld, op=0x%x, datatype=0x%x, actual_size=%lld\n",  
    512509                     __func__, lld(s_op->u.pipeline.buffer_used),  
    513                      parent_s_op->u.io.op,  
    514                      parent_s_op->u.io.datatype, 
     510                     s_op->u.pipeline.op,  
     511                     s_op->u.pipeline.datatype, 
    515512                     lld(js_p->actual_size)); /* AS */ 
    516513 
    517         switch(parent_s_op->u.io.datatype) { 
     514        switch(s_op->u.pipeline.datatype) { 
    518515        case ((int)0x4c000405): /* MPI_INT */ 
    519516            { 
     
    541538                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%d\n", lld(count), *tmp); 
    542539 
    543                 switch(parent_s_op->u.io.op) { 
     540                switch(s_op->u.pipeline.op) { 
    544541                case 0x58000001: /* MAX */ 
    545542                    result = *a; 
     
    653650                gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%lf\n",  
    654651                             lld(count), *tmp); 
    655                 switch(parent_s_op->u.io.op) { 
     652                switch(s_op->u.pipeline.op) { 
    656653                case 0x58000001: /* MAX */ 
    657654                    result = *a; 
     
    753750{ 
    754751    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 
    755     struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0); 
    756752    int ret, tmp_id; 
    757753    struct server_configuration_s *user_opts = get_server_config_struct(); 
     
    767763    allreduce_op->u.allreduce.recv_buf = (void*)malloc(5*sizeof(double)); 
    768764    memset(allreduce_op->u.allreduce.recv_buf, 0, 5*sizeof(double)); 
    769     allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); /* FIXME !!!! */ 
     765    allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); 
    770766    gossip_debug(GOSSIP_IO_DEBUG, "tree_depth=%d\n",  
    771767                 allreduce_op->u.allreduce.tree_depth); 
    772768    allreduce_op->u.allreduce.current_depth = 0; 
    773769    allreduce_op->u.allreduce.mask = 0x1; 
    774     allreduce_op->u.allreduce.dfile_array = parent_s_op->u.io.dfile_array; 
     770    allreduce_op->u.allreduce.dfile_array = s_op->u.pipeline.dfile_array; 
    775771    allreduce_op->u.allreduce.send_buf = (void*)malloc(5*sizeof(double)); 
    776772    memset(allreduce_op->u.allreduce.send_buf, 0, 5*sizeof(double)); 
     
    824820    /* FIXME: */ 
    825821    /* unless the second condition is set, the server starts 
    826        a new read request to one already done, and falls into 
     822       a new read request that is already done, and falls into 
    827823       infinite trove_read->bmi_send->check_done loop */ 
    828824    if(!segpool_done(h) && s_op->u.pipeline.segs != 0) { 
     
    831827    } 
    832828    else { 
    833         if(parent_s_op->u.io.op == (0x5800000f)) { /* KMEANS */ 
    834             parent_s_op->u.io.tmp_buffer = (void *)s_op->u.pipeline.buffer; 
     829#if 0 
     830        if(s_op->u.pipeline.op == (0x5800000f)) { /* KMEANS */ 
     831            memcpy(parent_s_op->u.io.tmp_buffer,  
     832                   s_op->u.pipeline.buffer, sizeof(double)); 
    835833        } 
     834#endif 
    836835        gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__); 
    837836    } 
  • branches/as-branch/src/server/pvfs2-server.h

    r7826 r7827  
    342342    void *user_ptr; 
    343343 
    344     int op; 
    345     int datatype; 
    346344    void *tmp_buffer; 
    347345    PVFS_size count; /* for MEAN operation */ 
    348     PVFS_handle *dfile_array; 
    349346 
    350347    PVFS_size aggregate_size; 
     
    367364    PVFS_BMI_addr_t address; 
    368365 
     366    PVFS_handle *dfile_array; 
    369367    int dfile_index; /* can be used for Rank */ 
    370368    int dfile_count; 
    371369    struct PINT_dist_s *dist; 
     370    PINT_request_file_data file_data; 
    372371 
    373372    PINT_Request *file_req; 
     
    375374    PINT_Request *mem_req; 
    376375 
     376    /* for strip alignment */ 
    377377    char tmp_buf[128]; /* FIXME */ 
    378378    PVFS_size unaligned_size; 
    379379 
    380380    enum PVFS_io_type io_type; 
     381 
     382    /* AS: operator and data type */ 
     383    int op; 
     384    int datatype; 
    381385 
    382386    char *buffer;