Changeset 7827
- Timestamp:
- 06/17/09 12:17:31 (4 years ago)
- Location:
- branches/as-branch/src/server
- Files:
-
- 4 modified
-
allreduce.sm (modified) (3 diffs)
-
io.sm (modified) (9 diffs)
-
pipeline.sm (modified) (15 diffs)
-
pvfs2-server.h (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/as-branch/src/server/allreduce.sm
r7823 r7827 140 140 double *a = s_op->u.allreduce.send_buf; 141 141 double *b = s_op->u.allreduce.recv_buf; 142 double tmp;143 js_p->error_code = 0;144 142 145 143 s_op->u.allreduce.current_depth++; … … 147 145 148 146 a[0] = a[0] + b[0]; 149 s_op->u.allreduce.send_buf = (void*)a;147 memcpy(s_op->u.allreduce.send_buf, (void*)a, sizeof(double)); 150 148 151 149 gossip_debug(GOSSIP_IO_DEBUG, "received=%lf\n", *b); … … 155 153 if(s_op->u.allreduce.current_depth < s_op->u.allreduce.tree_depth) 156 154 js_p->error_code = CONTINUE_ALLREDUCE; 155 else 156 js_p->error_code = 0; 157 157 158 gossip_debug(GOSSIP_IO_DEBUG, "%s: reduced result=%lf\n", __func__,159 *a);158 gossip_debug(GOSSIP_IO_DEBUG, "%s: reduced result=%lf\n", 159 __func__, *a); 160 160 161 161 return SM_ACTION_COMPLETE; -
branches/as-branch/src/server/io.sm
r7826 r7827 200 200 gossip_debug(GOSSIP_IO_DEBUG, "%s: tag=%d\n", __func__, s_op->tag); 201 201 s_op->u.io.user_ptr = NULL; 202 s_op->u.io.op = s_op->req->u.io.op; /* AS: operation */203 s_op->u.io.datatype = s_op->req->u.io.datatype; /* AS: dtype */204 s_op->u.io.dfile_array = s_op->req->u.io.dfile_array; /* AD: dfile_array */205 202 206 203 gossip_debug(GOSSIP_IO_DEBUG, "server_ct=%d\n", s_op->req->u.io.server_ct); … … 208 205 for(i=0; i<s_op->req->u.io.server_ct; i++) { 209 206 gossip_debug(GOSSIP_IO_DEBUG, "dfile_array[%d]=%llu\n", i, 210 llu(s_op-> u.io.dfile_array[i]));211 if(s_op-> u.io.dfile_array[i] == s_op->req->u.io.handle) {207 llu(s_op->req->u.io.dfile_array[i])); 208 if(s_op->req->u.io.dfile_array[i] == s_op->req->u.io.handle) { 212 209 dfile_index = i; 213 210 gossip_debug(GOSSIP_IO_DEBUG, "dfile_index=%d\n", i); … … 260 257 if(!fs_config) { 261 258 gossip_err("%s: Failed to get filesystem " 262 "config from fs_id of: %d\n", __func__,259 d "config from fs_id of: %d\n", __func__, 263 260 s_op->u.pipeline.fs_id); 264 261 js_p->error_code = -PVFS_EINVAL; … … 289 286 memset(pipeline_op, 0, sizeof(*pipeline_op)); 290 287 288 pipeline_op->u.pipeline.op = s_op->req->u.io.op; 289 pipeline_op->u.pipeline.datatype = s_op->req->u.io.datatype; 290 291 291 pipeline_op->u.pipeline.address = s_op->addr; 292 292 pipeline_op->u.pipeline.handle = s_op->req->u.io.handle; … … 295 295 pipeline_op->u.pipeline.seg_handle = seg_handle; 296 296 pipeline_op->u.pipeline.id = id[i]; 297 //pipeline_op->u.pipeline.parent = s_op;298 297 pipeline_op->u.pipeline.hints = s_op->req->hints; 299 298 pipeline_op->u.pipeline.tag = s_op->tag; … … 304 303 pipeline_op->u.pipeline.dfile_count = s_op->u.io.file_data.server_ct; 305 304 pipeline_op->u.pipeline.dist = s_op->u.io.file_data.dist; 305 pipeline_op->u.pipeline.dfile_array = s_op->req->u.io.dfile_array; 306 pipeline_op->u.pipeline.file_data = s_op->u.io.file_data; 307 pipeline_op->u.pipeline.file_req_offset = s_op->u.io.file_req_offset; 306 308 307 309 /* figure out if the fs config has trove data sync turned on or off … … 382 384 if(pipeline_op->u.pipeline.buffer) { 383 385 if(s_op->req->u.io.io_type == PVFS_IO_READ) { 386 memcpy(s_op->u.io.tmp_buffer, pipeline_op->u.pipeline.buffer, sizeof(double)); 384 387 BMI_memfree(pipeline_op->u.pipeline.address, 385 388 pipeline_op->u.pipeline.buffer, … … 575 578 s_op->u.io.total_transferred; /* AS */ 576 579 577 gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", __func__, 578 lld(s_op->u.io.total_transferred)); err = PINT_encode( 580 gossip_debug(GOSSIP_IO_DEBUG, "%s: total_transferred=%lld\n", 581 __func__, lld(s_op->u.io.total_transferred)); 582 err = PINT_encode( 579 583 &s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded), 580 584 s_op->addr, s_op->decoded.enc_type); 581 585 582 gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 0583 586 if (err < 0) 584 587 { … … 597 600 gossip_debug(GOSSIP_IO_DEBUG, "%s: err=%d\n", __func__, err); // err = 1 598 601 599 return err; 602 js_p->error_code = err; 603 return SM_ACTION_COMPLETE; 600 604 } 601 605 -
branches/as-branch/src/server/pipeline.sm
r7826 r7827 31 31 #define DO_COMP 103 32 32 #define DO_ALLREDUCE 104 33 #define CONTINUE_ALLREDUCE 10534 33 35 34 static int s2s_comp_fn( … … 123 122 job_id_t tmp_id; 124 123 125 gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n", smcb->base_frame, smcb->frame_count); 124 gossip_debug(GOSSIP_IO_DEBUG, "smcb->base_frame=%d, frame_count=%d\n", 125 smcb->base_frame, smcb->frame_count); 126 126 s_op->u.pipeline.buffer_used = 0; 127 127 bytes = s_op->u.pipeline.buffer_size; … … 220 220 job_id_t tmp_id; 221 221 struct server_configuration_s *user_opts = get_server_config_struct(); 222 struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);223 222 224 223 if(s_op->u.pipeline.segs == 0) { … … 240 239 assert(s_op->u.pipeline.buffer_used); 241 240 242 if( parent_s_op->u.io.op != 0) { /* AS: when op is specified */241 if(s_op->u.pipeline.op != 0) { /* AS: when op is specified */ 243 242 ret = DO_COMP; /* AS: skip sending if op is specified */ 244 243 gossip_debug(GOSSIP_IO_DEBUG, "%s: parent->op != 0\n", __func__); … … 309 308 { 310 309 struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 311 struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);312 310 js_p->error_code = 0; 313 PVFS_size file_req_offset = parent_s_op->u.io.file_req_offset;314 PINT_request_file_data fdata = parent_s_op->u.io.file_data;311 PVFS_size file_req_offset = s_op->u.pipeline.file_req_offset; 312 PINT_request_file_data fdata = s_op->u.pipeline.file_data; 315 313 PVFS_simple_stripe_params *dparam = 316 314 (PVFS_simple_stripe_params*)fdata.dist->params; … … 323 321 gossip_debug(GOSSIP_IO_DEBUG, "loff=%lld, file_req_offset=%lld\n", lld(loff), lld(file_req_offset)); 324 322 325 switch( parent_s_op->u.io.datatype) {323 switch(s_op->u.pipeline.datatype) { 326 324 case ((int)0x4c000405): /* MPI_INT */ 327 325 count = (PVFS_size)(s_op->u.pipeline.buffer_used - … … 365 363 { 366 364 struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 367 struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);368 365 PINT_sm_msgpair_state *msg_p = NULL; 369 366 struct server_configuration_s *user_opts = get_server_config_struct(); … … 388 385 /* determine which server we need to talk to */ 389 386 next_server_index = (s_op->u.pipeline.dfile_index + 1)%(s_op->u.pipeline.dfile_count); 390 next_server_handle = parent_s_op->u.io.dfile_array[next_server_index];387 next_server_handle = s_op->u.pipeline.dfile_array[next_server_index]; 391 388 392 389 /* build a request */ … … 511 508 "%s: buffer_used=%lld, op=0x%x, datatype=0x%x, actual_size=%lld\n", 512 509 __func__, lld(s_op->u.pipeline.buffer_used), 513 parent_s_op->u.io.op,514 parent_s_op->u.io.datatype,510 s_op->u.pipeline.op, 511 s_op->u.pipeline.datatype, 515 512 lld(js_p->actual_size)); /* AS */ 516 513 517 switch( parent_s_op->u.io.datatype) {514 switch(s_op->u.pipeline.datatype) { 518 515 case ((int)0x4c000405): /* MPI_INT */ 519 516 { … … 541 538 gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%d\n", lld(count), *tmp); 542 539 543 switch( parent_s_op->u.io.op) {540 switch(s_op->u.pipeline.op) { 544 541 case 0x58000001: /* MAX */ 545 542 result = *a; … … 653 650 gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, "count=%lld, tmp=%lf\n", 654 651 lld(count), *tmp); 655 switch( parent_s_op->u.io.op) {652 switch(s_op->u.pipeline.op) { 656 653 case 0x58000001: /* MAX */ 657 654 result = *a; … … 753 750 { 754 751 struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT); 755 struct PINT_server_op *parent_s_op = PINT_sm_frame(smcb->parent_smcb, 0);756 752 int ret, tmp_id; 757 753 struct server_configuration_s *user_opts = get_server_config_struct(); … … 767 763 allreduce_op->u.allreduce.recv_buf = (void*)malloc(5*sizeof(double)); 768 764 memset(allreduce_op->u.allreduce.recv_buf, 0, 5*sizeof(double)); 769 allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); /* FIXME !!!! */765 allreduce_op->u.allreduce.tree_depth = log2(s_op->u.pipeline.dfile_count); 770 766 gossip_debug(GOSSIP_IO_DEBUG, "tree_depth=%d\n", 771 767 allreduce_op->u.allreduce.tree_depth); 772 768 allreduce_op->u.allreduce.current_depth = 0; 773 769 allreduce_op->u.allreduce.mask = 0x1; 774 allreduce_op->u.allreduce.dfile_array = parent_s_op->u.io.dfile_array;770 allreduce_op->u.allreduce.dfile_array = s_op->u.pipeline.dfile_array; 775 771 allreduce_op->u.allreduce.send_buf = (void*)malloc(5*sizeof(double)); 776 772 memset(allreduce_op->u.allreduce.send_buf, 0, 5*sizeof(double)); … … 824 820 /* FIXME: */ 825 821 /* unless the second condition is set, the server starts 826 a new read request t o onealready done, and falls into822 a new read request that is already done, and falls into 827 823 infinite trove_read->bmi_send->check_done loop */ 828 824 if(!segpool_done(h) && s_op->u.pipeline.segs != 0) { … … 831 827 } 832 828 else { 833 if(parent_s_op->u.io.op == (0x5800000f)) { /* KMEANS */ 834 parent_s_op->u.io.tmp_buffer = (void *)s_op->u.pipeline.buffer; 829 #if 0 830 if(s_op->u.pipeline.op == (0x5800000f)) { /* KMEANS */ 831 memcpy(parent_s_op->u.io.tmp_buffer, 832 s_op->u.pipeline.buffer, sizeof(double)); 835 833 } 834 #endif 836 835 gossip_debug(GOSSIP_IO_DEBUG, "%s: DONE\n", __func__); 837 836 } -
branches/as-branch/src/server/pvfs2-server.h
r7826 r7827 342 342 void *user_ptr; 343 343 344 int op;345 int datatype;346 344 void *tmp_buffer; 347 345 PVFS_size count; /* for MEAN operation */ 348 PVFS_handle *dfile_array;349 346 350 347 PVFS_size aggregate_size; … … 367 364 PVFS_BMI_addr_t address; 368 365 366 PVFS_handle *dfile_array; 369 367 int dfile_index; /* can be used for Rank */ 370 368 int dfile_count; 371 369 struct PINT_dist_s *dist; 370 PINT_request_file_data file_data; 372 371 373 372 PINT_Request *file_req; … … 375 374 PINT_Request *mem_req; 376 375 376 /* for strip alignment */ 377 377 char tmp_buf[128]; /* FIXME */ 378 378 PVFS_size unaligned_size; 379 379 380 380 enum PVFS_io_type io_type; 381 382 /* AS: operator and data type */ 383 int op; 384 int datatype; 381 385 382 386 char *buffer;
