/* * (C) 2001 Clemson University and The University of Chicago * * See COPYING in top-level directory. */ /* adding a comment */ #include #include #include "server-config.h" #include "pvfs2-server.h" #include "pvfs2-attr.h" #include "pvfs2-internal.h" #include "pvfs2-util.h" #include "pint-util.h" #include "pint-eattr.h" #include "pint-cached-config.h" #include "pvfs2-dist-basic.h" #include "pvfs2-mirror.h" /*Global Variables*/ static char PVFS_handle_string[PVFS_HANDLE_STRING_LEN]={0}; /*attribute keys used for the mirroring process*/ static char handle_key[] = USER_PVFS2_MIRROR_HANDLES; static char copy_count_key[] = USER_PVFS2_MIRROR_COPIES; static char status_key[] = USER_PVFS2_MIRROR_STATUS; static char mode_key[] = USER_PVFS2_MIRROR_MODE; enum { LOCAL_HANDLES = 100, REMOTE_HANDLES, REPLACE_DONE, LOCAL_SRC, REMOTE_SRC, RETRY, NOTHING_TO_DO }; #define SERVER_NAME_MAX 1024 #define WRITE_RETRY_LIMIT 2 #define DEFAULT_COPIES 1 /*helper macros*/ /*Sets up a two dimensional array from a one dimensional array*/ #define ONE_DIM_TO_TWO_DIMS(in,out,rows,cols,type) \ do { \ int i; \ type *p; \ for (i=0,p=in; i obtain_source_info; default => cleanup; } state obtain_source_info { run obtain_source_info; success => inspect_source_info; default => cleanup; } state inspect_source_info { run inspect_source_info; success => create_local_datahandles; default => cleanup; } state create_local_datahandles { run create_local_datahandles; success => obtain_local_handle_sizes; default => cleanup; } state obtain_local_handle_sizes { run obtain_local_handle_sizes; success => inspect_local_handle_sizes; default => cleanup; } state inspect_local_handle_sizes { run inspect_local_handle_sizes; success => create_remote_datahandles; default => cleanup; } state create_remote_datahandles { run create_remote_datahandles; success => setup_datahandle_copies; default => remove_local_datahandle_objects; } state setup_datahandle_copies { run setup_datahandle_copies; success => copy_data; default => remove_local_datahandle_objects; } state copy_data { pjmp copy_data { LOCAL_SRC => pvfs2_pjmp_mirror_work_sm; REMOTE_SRC => pvfs2_pjmp_call_msgpairarray_sm; } success => check_copy_results; default => cleanup; } state check_copy_results { run check_copy_results; success => store_mirror_info; default => check_for_retries; } state check_for_retries { run check_for_retries; RETRY => copy_data; default => store_mirror_info; } state store_mirror_info { run store_mirror_info; /*default => replace_remote_datahandle_objects;*/ /*If the write of the datahandle information fails, even though the */ /*the copies actually exist, the metadata for the logical file will */ /*NOT have knowledge of it. */ default => check_store_job; } state check_store_job { run check_store_job; default => cleanup; } state replace_remote_datahandle_objects { run replace_remote_datahandle_objects; REPLACE_DONE => remove_local_datahandle_objects; default => replace_remote_datahandle_objects; } state remove_local_datahandle_objects { run remove_local_datahandle_objects; default => cleanup; } state cleanup { run cleanup; default => return; } } /*end nested state machine pvfs2_create_immutable_copies_sm*/ %% /************************************************************************/ /*Actions for pvfs2_create_immutable_copies_sm */ /************************************************************************/ static PINT_sm_action initialize_structures (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing initialize_structures....\n"); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tframe count is %d.\n",smcb->frame_count); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t base frame is %d.\n",smcb->base_frame); struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(s_op->u.create_copies); int ret; js_p->error_code = 0; /* These values are generated by default when the prelude executes. */ /* When seteattr executes, the prelude retrieves the common metadata info. */ imm_p->dfile_count = s_op->target_object_attr->u.meta.dfile_count; PVFS_handle_copy(imm_p->metadata_handle, s_op->target_handle); imm_p->fs_id = s_op->target_fs_id; /* Get the number of IO servers currently running for the given filesystem */ ret = PINT_cached_config_get_num_io(imm_p->fs_id,&(imm_p->io_servers_count)); if (ret) { js_p->error_code = ret; return SM_ACTION_COMPLETE; } PVFS_handle_unparse(imm_p->metadata_handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdfile_count: %d\tmetadata_handle: %s" "\tfs_id: %u" "\tio_servers_count: %d\n" ,imm_p->dfile_count ,PVFS_handle_string ,imm_p->fs_id ,imm_p->io_servers_count ); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tds_attr.b_size:%d\n" ,(int)s_op->ds_attr.u.datafile.b_size); return SM_ACTION_COMPLETE; } /*end action initialize_structures*/ static PINT_sm_action obtain_source_info (struct PINT_smcb *smcb ,job_status_s *js_p) { /*In this state, we are retrieving the data handles, the number of */ /*desired copies, and the mirroring mode for the given meta data handle.*/ /*If the mirroring mode == NO_MIRRORING, then we will not perform the */ /*mirror operation. */ gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_source_info....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int keyval_count = 3; job_id_t job_id; int ret = 0,i; js_p->error_code = 0; /*allocate space to retrieve attributes from trove.*/ sm_p->keyval_count = keyval_count; sm_p->key_a = malloc(sizeof(*sm_p->key_a) * sm_p->keyval_count); sm_p->val_a = malloc(sizeof(*sm_p->val_a) * sm_p->keyval_count); sm_p->error_a = malloc(sizeof(*sm_p->error_a) * sm_p->keyval_count); if (!sm_p->key_a || !sm_p->val_a || !sm_p->error_a) goto error_exit; memset(sm_p->key_a , 0, sizeof(*sm_p->key_a) * sm_p->keyval_count); memset(sm_p->val_a , 0, sizeof(*sm_p->val_a) * sm_p->keyval_count); memset(sm_p->error_a, 0, sizeof(*sm_p->error_a) * sm_p->keyval_count); /*setup key/val to retreive the mirroring mode*/ i=0; assert(ikey_a[i].buffer = mode_key; sm_p->key_a[i].buffer_sz = sizeof(mode_key); sm_p->val_a[i].buffer = &(imm_p->mirror_mode); sm_p->val_a[i].buffer_sz = sizeof(imm_p->mirror_mode); /*setup key/val to retreive the datahandles*/ i++; assert(ikey_a[i].buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key; sm_p->key_a[i].buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size; imm_p->handle_array_base = malloc(imm_p->dfile_count * sizeof(PVFS_handle)); if (!imm_p->handle_array_base) goto error_exit; sm_p->val_a[i].buffer = imm_p->handle_array_base; sm_p->val_a[i].buffer_sz = (imm_p->dfile_count * sizeof(PVFS_handle)); /*setup key/val to retreive the number of copies*/ i++; assert(ikey_a[i].buffer = copy_count_key; sm_p->key_a[i].buffer_sz = sizeof(copy_count_key); sm_p->val_a[i].buffer = &(imm_p->copies); sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies); /***** don't need to get file's distriubtion information, because we are */ /***** copying each datahandle, as is, directly into a new datahandle. */ /***** Distribution is only needed when you are modifying the logical */ /***** file, not the individual data handles. However, we need to pro- */ /***** vide a distribution value for the IO request, even though it won't*/ /***** be used. So, instead of issuing a trove call to get the file's */ /***** distribution information, we will be using just the "basic" dis- */ /***** tribution. */ imm_p->dist = malloc(sizeof(PINT_dist)); if (!imm_p->dist) goto error_exit; memset(imm_p->dist,0,sizeof(PINT_dist)); imm_p->dist->dist_name = malloc(PVFS_DIST_BASIC_NAME_SIZE); if (!imm_p->dist->dist_name) goto error_exit; strcpy(imm_p->dist->dist_name,PVFS_DIST_BASIC_NAME); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdistribution name:%s\n" ,imm_p->dist->dist_name); ret = PINT_dist_lookup(imm_p->dist); if (ret) { gossip_lerr("Error looking up basic distribution:%d",ret); js_p->error_code = ret; goto error_exit; } /*retrieve key/val pairs */ ret = job_trove_keyval_read_list( imm_p->fs_id ,imm_p->metadata_handle ,sm_p->key_a ,sm_p->val_a ,sm_p->error_a ,sm_p->keyval_count ,0 ,NULL ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL ); return (ret); error_exit: if (sm_p->key_a) free (sm_p->key_a); if (sm_p->val_a) free (sm_p->val_a); if (sm_p->error_a) free(sm_p->error_a); sm_p->key_a = sm_p->val_a = NULL; sm_p->error_a = NULL; if (imm_p->dist && imm_p->dist->dist_name) free(imm_p->dist->dist_name); if (imm_p->dist) free(imm_p->dist); imm_p->dist = NULL; if (js_p->error_code == 0) js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; }/*end action obtain_source_info*/ static PINT_sm_action inspect_source_info (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_source_info....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); char server_name[SERVER_NAME_MAX] = {0}; server_configuration_s *config = get_server_config_struct(); int ret = 0; int i; /*check error codes from previous trove read-list call. */ for (i=0; ikeyval_count; i++) { /*if the mirroring mode has no entry, the mode=NO_MIRRORING, or the mode*/ /*is not the expected mode, then there is nothing to do. */ if (sm_p->key_a[i].buffer == mode_key) { if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT) { js_p->error_code = NOTHING_TO_DO; goto error_exit; } imm_p->mirror_mode = *(MIRROR_MODE *)sm_p->val_a[i].buffer; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved mirroring mode is %d.\n" ,imm_p->mirror_mode); if (imm_p->mirror_mode == NO_MIRRORING || imm_p->mirror_mode != imm_p->expected_mirror_mode) { js_p->error_code = NOTHING_TO_DO; goto error_exit; } } /*if the user hasn't set the number of copies, this code will */ /*set a default (currently = 1). */ if (sm_p->key_a[i].buffer == copy_count_key) { if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT) { gossip_lerr("User-defined number of copies not found. " "Defaulting number of copies to %d.\n" ,DEFAULT_COPIES); imm_p->copies = DEFAULT_COPIES; continue; } } /*check for other types of errors.*/ if (sm_p->error_a[i]) { gossip_lerr("Error retrieving value for '%s' : %s\n" ,(char *)sm_p->key_a[i].buffer ,strerror(PVFS_get_errno_mapping(-sm_p->error_a[i]))); js_p->error_code = sm_p->error_a[i]; goto error_exit; } }/*end for*/ gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved # of copies:%d\n" ,imm_p->copies); /*If there is only one server running, then it makes no sense to create */ /*copies on the same server. */ if (imm_p->io_servers_count == 1) { gossip_lerr("Mirroring operation is not permitted when only one " "I/O server is running.\n"); js_p->error_code = -PVFS_EPERM; return SM_ACTION_COMPLETE; } /* We need at least (# of copies) + 1 I/O servers running in the system to */ /* prevent duplicate data on any one server, while not exceeding the number*/ /* of I/O servers in the system. If the number of copies requested by the */ /* user is >= the number of I/O servers in the system, then we lower the */ /* number of requested copies. We then set the number of I/O servers */ /* required to meet this request with the (new value of copies) + 1. */ /* At this point, if the number of I/O servers required is less than the */ /* number of servers in this file's distribution, then set the number of */ /* required I/O servers to the same number of servers in this file's dis- */ /* tribution. */ if (imm_p->copies >= imm_p->io_servers_count) imm_p->copies = imm_p->io_servers_count - 1; imm_p->io_servers_required = imm_p->copies + 1; if (imm_p->io_servers_required < imm_p->dfile_count) imm_p->io_servers_required = imm_p->dfile_count; /*allocate space for io_servers array. this array will contain the server */ /*names which will be used as the valid destination remotes for the copies.*/ imm_p->io_servers = malloc( imm_p->io_servers_required * sizeof(char *) ); if ( !imm_p->io_servers ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->io_servers,0,imm_p->io_servers_required * sizeof(char *)); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated char *:\n"); for (i=0; iio_servers_required; i++) gossip_debug(GOSSIP_MIRROR_DEBUG,"\t io_servers[%d] : %p " "\t &io_servers[%d] : %p\n" ,i,imm_p->io_servers[i] ,i,&(imm_p->io_servers[i])); imm_p->num_io_servers = imm_p->io_servers_required; for (i=0;iio_servers_required;i++) { imm_p->io_servers[i] = malloc( sizeof(server_name) ); if ( !imm_p->io_servers[i] ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->io_servers[i],0,sizeof(server_name)); } gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated server_name..\n"); for (i=0; iio_servers_required; i++) gossip_debug(GOSSIP_MIRROR_DEBUG,"\t io_servers[%d] : %p " "\t *io_servers[%d] : %s " "\t &io_servers[%d] : %p\n" ,i,imm_p->io_servers[i] ,i,imm_p->io_servers[i] ,i,&(imm_p->io_servers[i])); /*allocate space for the local source handles*/ imm_p->handle_array_base_local = malloc(imm_p->dfile_count * sizeof(PVFS_handle)); if (!imm_p->handle_array_base_local) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->handle_array_base_local ,0 ,imm_p->dfile_count * sizeof(PVFS_handle)); imm_p->handle_array_base_local_count = 0; /*allocate space for the data handle copies array, which will hold a */ /*combination of local and remote data handles used as destination handles */ /*for the copies. The order of this array will mimmick the order of the */ /*original data file array. Thus, handle_array_base[i] */ /*and handle_array_copies[i] will hold handles for the same server number. */ imm_p->handle_array_copies = malloc(imm_p->io_servers_required * imm_p->copies * sizeof(PVFS_handle)); if ( !imm_p->handle_array_copies ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->handle_array_copies,0,imm_p->io_servers_required * imm_p->copies * sizeof(PVFS_handle)); /*allocate space for the local_io_servers array. this array contains the */ /*server names for local data handles. */ imm_p->local_io_servers = malloc( imm_p->io_servers_required * sizeof(char *)); if ( !imm_p->local_io_servers ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->local_io_servers ,0 ,imm_p->io_servers_required * sizeof(char *)); imm_p->local_io_servers_count = 0; /*allocate space for the remote_io_servers array. this array contains the */ /*server names for remote data handles. */ imm_p->remote_io_servers = malloc( imm_p->io_servers_required * sizeof(char*)); if ( !imm_p->remote_io_servers ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->remote_io_servers ,0 ,imm_p->io_servers_required * sizeof(char*)); imm_p->remote_io_servers_count = 0; /*populate the io_servers array with server_names: */ /*Step 1: Always start by using the server names associated with the */ /* original datahandles, keeping the order in tact. */ /*Step 2: If additional servers are needed, then tap into the list of */ /* servers running in the file system and grab those that are not */ /* currently in the io_servers list. */ /*Step 1*/ for (i=0; idfile_count; i++) { ret = PINT_cached_config_get_server_name( imm_p->io_servers[i], sizeof(server_name)-1, imm_p->handle_array_base[i], imm_p->fs_id ); if (ret) { js_p->error_code = ret; goto error_exit; } PVFS_handle_unparse(imm_p->handle_array_base[i],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of server_name is %s " "for handle %s\n" ,imm_p->io_servers[i] ,PVFS_handle_string); }/*end for*/ /*Step 2*/ if (imm_p->io_servers_required > imm_p->dfile_count) { ret = get_server_names(imm_p); if (ret) { gossip_lerr("Unable to populate io_servers list.\n"); js_p->error_code = ret; goto error_exit; } } gossip_debug(GOSSIP_MIRROR_DEBUG,"\tconfig->host_id is %s.\n" ,config->host_id); gossip_debug(GOSSIP_MIRROR_DEBUG,"\timm_p->io_servers_required : %d\n" ,imm_p->io_servers_required); for (i=0; iio_servers_required; i++) { char *server_name = imm_p->io_servers[i]; if (strncmp(server_name,config->host_id,SERVER_NAME_MAX-1) == 0) {/*local*/ gossip_debug(GOSSIP_MIRROR_DEBUG,"\tprocessing local....\n"); imm_p->local_io_servers[imm_p->handle_array_copies_local_count] = malloc( SERVER_NAME_MAX ); if ( !imm_p->local_io_servers[imm_p->handle_array_copies_local_count] ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->local_io_servers[imm_p->handle_array_copies_local_count] ,0 ,SERVER_NAME_MAX); memcpy(imm_p->local_io_servers[imm_p->handle_array_copies_local_count], server_name,SERVER_NAME_MAX-1); if ( i < imm_p->dfile_count ) { PVFS_handle_copy( imm_p->handle_array_base_local[ imm_p->handle_array_base_local_count], imm_p->handle_array_base[i]); imm_p->handle_array_base_local_count++; PVFS_handle_unparse(imm_p->handle_array_base_local[imm_p->handle_array_base_local_count] ,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal source handle(%d):%s\n" ,imm_p->handle_array_base_local_count ,PVFS_handle_string); } imm_p->handle_array_copies_local_count++; } else {/*remote*/ imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count] = malloc( SERVER_NAME_MAX ); if (!imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count]) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } memset(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count] ,0 ,SERVER_NAME_MAX); memcpy(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count] ,server_name,SERVER_NAME_MAX-1); imm_p->handle_array_copies_remote_count++; }/*end if*/ }/*end for*/ gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal: %d\tRemote: %d\n" ,imm_p->handle_array_copies_local_count ,imm_p->handle_array_copies_remote_count); imm_p->local_io_servers_count = imm_p->handle_array_copies_local_count; imm_p->remote_io_servers_count = imm_p->handle_array_copies_remote_count; for (i=0; ilocal_io_servers_count; i++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal_io_servers[%d]: %s\n",i, imm_p->local_io_servers[i]); } for (i=0; iremote_io_servers_count; i++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tremote_io_servers[%d]: %s\n",i, imm_p->remote_io_servers[i]); } /*allocate and initialize space for local and remote handle arrays*/ if (imm_p->handle_array_copies_local_count) { imm_p->handle_array_copies_local = malloc( imm_p->handle_array_copies_local_count * imm_p->copies * sizeof(PVFS_handle)); if ( !imm_p->handle_array_copies_local ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } } if (imm_p->handle_array_copies_remote_count) { imm_p->handle_array_copies_remote = malloc( imm_p->handle_array_copies_remote_count * imm_p->copies * sizeof(PVFS_handle)); if ( !imm_p->handle_array_copies_remote ) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } }/*end if*/ memset(imm_p->handle_array_copies_local ,0 ,imm_p->handle_array_copies_local_count * imm_p->copies * sizeof(PVFS_handle)); memset(imm_p->handle_array_copies_remote ,0 ,imm_p->handle_array_copies_remote_count * imm_p->copies * sizeof(PVFS_handle)); error_exit: /*all other memory will be freed in the "cleanup" action.*/ free(sm_p->key_a); free(sm_p->val_a); free(sm_p->error_a); sm_p->key_a = sm_p->val_a = NULL; sm_p->error_a = NULL; return SM_ACTION_COMPLETE; }/*end action inspect_source_info*/ /*We must get the bstream size for any datahandles that reside on this server.*/ /*This scenario occurs when a metadata and i/o server are one of the same or */ /*a metadata server and i/o server are running on the same machine. I think */ /*this is outdated now, but I check for it anyway. */ static PINT_sm_action obtain_local_handle_sizes(struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_local_handle_sizes....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); job_id_t job_id; int ret = 0; js_p->error_code = 0; /*Do we have any local handles?*/ if (imm_p->handle_array_copies_local_count == 0) return SM_ACTION_COMPLETE; sm_p->error_a = malloc(imm_p->handle_array_base_local_count * sizeof(PVFS_error) ); if (!sm_p->error_a) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } imm_p->ds_attr_a = malloc(imm_p->handle_array_base_local_count * sizeof(PVFS_ds_attributes)); if (!imm_p->ds_attr_a) { js_p->error_code = -PVFS_ENOMEM; goto error_exit; } ret = job_trove_dspace_getattr_list(imm_p->fs_id ,imm_p->handle_array_base_local_count ,imm_p->handle_array_base_local ,smcb ,sm_p->error_a ,imm_p->ds_attr_a ,0 ,js_p ,&job_id ,server_job_context ,NULL); return ret; error_exit: if (sm_p->error_a) free(sm_p->error_a); sm_p->error_a = NULL; return SM_ACTION_COMPLETE; }/*end action obtain_local_handle_sizes*/ static PINT_sm_action inspect_local_handle_sizes(struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_local_handle_sizes..\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int i,j; /*Do we have any local handles?*/ if (imm_p->handle_array_copies_local_count == 0) return SM_ACTION_COMPLETE; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tchecking for errors....\n"); /*check for errors*/ for (i=0; ihandle_array_base_local_count; i++) { if (sm_p->error_a[i]) { js_p->error_code = sm_p->error_a[i]; free(sm_p->error_a); sm_p->error_a = NULL; return SM_ACTION_COMPLETE; } }/*end for*/ js_p->error_code = 0; imm_p->bstream_array_base_local = malloc(imm_p->dfile_count * sizeof(PVFS_size)); if (!imm_p->bstream_array_base_local) { js_p->error_code = -PVFS_ENOMEM; free(sm_p->error_a); sm_p->error_a = NULL; return SM_ACTION_COMPLETE; } memset(imm_p->bstream_array_base_local ,0 ,imm_p->dfile_count * sizeof(PVFS_size)); gossip_debug(GOSSIP_MIRROR_DEBUG ,"\tpopulating bstream_array_base_local...\n"); gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_base_local_count:%d\n" ,imm_p->handle_array_base_local_count); /*populate bstream_array_base_local*/ for (i=0; ihandle_array_base_local_count; i++) { for (j=0; jdfile_count; j++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal handle(%d):%llu" "\tbase handle(%d):%llu\n" ,i,llu(imm_p->handle_array_base_local[i]) ,j,llu(imm_p->handle_array_base[j]) ); if (imm_p->handle_array_base_local[i] == imm_p->handle_array_base[j]) { imm_p->bstream_array_base_local[j] = imm_p->ds_attr_a[i].u.datafile.b_size; gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle:%llu\tsize:%d\n" ,llu(imm_p->handle_array_base_local[i]) ,(int)imm_p->bstream_array_base_local[j] ); } }/*end for*/ }/*end for*/ free(sm_p->error_a); sm_p->error_a = NULL; return SM_ACTION_COMPLETE; }/*end action inspect_local_handle_sizes*/ static PINT_sm_action create_local_datahandles (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_local_datahandles....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); job_id_t job_id; int ret = 0; int i; PVFS_handle_extent_array data_handle_ext_array; server_configuration_s *config = get_server_config_struct(); js_p->error_code = 0; /*Do we have any local handles?*/ if (imm_p->handle_array_copies_local_count == 0) return SM_ACTION_COMPLETE; gossip_debug(GOSSIP_MIRROR_DEBUG,"Target handle: %llu\tTarget FS ID: %d\n" ,llu(imm_p->metadata_handle),imm_p->fs_id); gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile count: %d\n",imm_p->dfile_count); gossip_debug(GOSSIP_MIRROR_DEBUG,"stuffed size: %d\n" ,sm_p->target_object_attr->u.meta.stuffed_size); gossip_debug(GOSSIP_MIRROR_DEBUG,"hint.flags: %llu\n" ,llu(sm_p->target_object_attr->u.meta.hint.flags)); gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile array P: %p\n" ,sm_p->target_object_attr->u.meta.dfile_array); /*find local IO extent array for this file system for metadata host*/ ret = PINT_cached_config_get_server( imm_p->fs_id ,config->host_id ,PINT_SERVER_TYPE_IO ,&data_handle_ext_array ); if (ret) { js_p->error_code = ret; return SM_ACTION_COMPLETE; } for (i=0;icopies,&imm_p->copies); /*create local datahandles - will be used as destination handles for copies*/ ret = job_trove_dspace_create_list( imm_p->fs_id ,imm_p->handle_array_copies_local ,imm_p->handle_array_copies_local_count * imm_p->copies ,PVFS_TYPE_DATAFILE ,NULL ,TROVE_SYNC ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL ); return ret; } /*end action create_local_datahandles*/ static PINT_sm_action create_remote_datahandles (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_remote_datahandles...\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int ret = 0,i; job_id_t job_id; js_p->error_code = 0; if (imm_p->handle_array_copies_remote_count == 0) return SM_ACTION_COMPLETE; int rows = imm_p->copies; int cols = imm_p->handle_array_copies_remote_count; imm_p->my_remote_servers = malloc(sizeof(char *) * rows * cols); if (!imm_p->my_remote_servers) { gossip_lerr("Error allocating imm_p->my_remote_servers.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } memset(imm_p->my_remote_servers,0,sizeof(char *) * rows * cols); /*setup my_remote_servers[copy,remote#] = remote server name. This will */ /*allow job_precreate_pool to return handle_array_copies_remote where */ /*[copy,remote#] = remote handle. We end up with a list of remotes for */ /*each copy in original distribution order. */ for (i=0; i<(rows*cols); i++) { imm_p->my_remote_servers[i] = imm_p->remote_io_servers[i%cols]; gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tremote_io_servers[%d]:%s " "my_remote_servers[%d]:%s\n" ,(i%cols) ,imm_p->remote_io_servers[i%cols] ,i ,imm_p->my_remote_servers[i]); } ret = job_precreate_pool_get_handles(imm_p->fs_id ,rows*cols, PVFS_TYPE_DATAFILE, (const char **)imm_p->my_remote_servers ,imm_p->handle_array_copies_remote ,0 ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL); return ret; }/*end action create_remote_datahandles*/ static PINT_sm_action setup_datahandle_copies (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing setup_datahandle_copies...\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); server_configuration_s *config = get_server_config_struct(); int i,j,k; js_p->error_code = 0; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRemote destination handles:\n"); int rows = imm_p->copies; int cols = imm_p->handle_array_copies_remote_count; for (i=0; i<(rows*cols); i++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tRemote handle(%d):%llu\n" ,i ,llu(imm_p->handle_array_copies_remote[i])); } gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal destination handles:\n"); cols = imm_p->handle_array_copies_local_count; for (i=0; i<(rows*cols); i++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tLocal handle(%d):%llu\n" ,i ,llu(imm_p->handle_array_copies_local[i])); } for (i=0,j=0,k=0; i<(imm_p->io_servers_required * imm_p->copies); i++) { if ( strncmp(imm_p->io_servers[i%imm_p->io_servers_required] ,config->host_id,SERVER_NAME_MAX-1) == 0 ) {/*local*/ memcpy(&imm_p->handle_array_copies[i] ,&imm_p->handle_array_copies_local[j],sizeof(PVFS_handle)); j++; } else {/*remote*/ memcpy(&imm_p->handle_array_copies[i] ,&imm_p->handle_array_copies_remote[k],sizeof(PVFS_handle)); k++; } }/*end for*/ for (i=0; i<(imm_p->io_servers_required * imm_p->copies); i++) gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies[%d]: %llu.\n" ,i ,llu(imm_p->handle_array_copies[i])); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tnumber of io servers required: %d\n" ,imm_p->io_servers_required); /*create and initialize the writes_completed array*/ imm_p->writes_completed = malloc(sizeof(struct writes_completed_s) * imm_p->dfile_count * imm_p->copies); if (!imm_p->writes_completed) { gossip_lerr("Unable to allocate imm_p->writes_completed.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } for (i=0; i<(imm_p->dfile_count * imm_p->copies); i++) { imm_p->writes_completed[i].status = STATUS_INIT; PVFS_handle_clear(imm_p->writes_completed[i].handle); } //memset(imm_p->writes_completed,UINT64_HIGH,sizeof(PVFS_handle) // * imm_p->dfile_count // * imm_p->copies); /*the retry count is used to monitor how many times we retry a write. */ /*this value is incremented in the check_for_retries state. */ imm_p->retry_count = 0; return SM_ACTION_COMPLETE; }/*end action setup_datahandle_copies*/ static PINT_sm_action copy_data (struct PINT_smcb *smcb, job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing copy_data....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); server_configuration_s *config = get_server_config_struct(); filesystem_configuration_s *fs = PINT_config_find_fs_id(config,imm_p->fs_id); int ret = 0; int src,row,cols,i,index,wc; char PVFS_handle_string[PVFS_HANDLE_STRING_LEN]; /*this variable helps to understand the logic better. it is a redeclara- */ /*tion of the one dimensional imm_p->handle_array_copies and can be ac- */ /*cessed as handle_array_copies[copies,dfile_count]. */ PVFS_handle *handle_array_copies[imm_p->copies]; memset(handle_array_copies,0,sizeof(PVFS_handle) * imm_p->copies); ONE_DIM_TO_TWO_DIMS(imm_p->handle_array_copies ,handle_array_copies ,imm_p->copies,imm_p->io_servers_required ,PVFS_handle); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tone dim to two dims:\n"); for (row=0; rowcopies; row++) { for (cols=0; colsio_servers_required; cols++) { memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(handle_array_copies[row][cols],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle_array_copies[%d][%d] : " "%sn" ,row,cols ,PVFS_handle_string); } } js_p->error_code = 0; /*for each source handle[src], create a MIRROR request containing a set of */ /*destination handles. */ for (src=0; srcdfile_count; src++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWorking on src #%d\n",src); /* writes_completed indicates the status of each copy for each source: */ /* 0 ==> completed, 1 ==> incomplete, STATUS_INIT ==> initial state. */ /* If incomplete, the value stored is the handle is the destination */ /* handle. */ for (row=src,cols=0; colscopies; cols++) { index = (imm_p->copies * row) + cols; if ( imm_p->writes_completed[index].status == 1 || imm_p->writes_completed[index].status == STATUS_INIT ) //if ( imm_p->writes_completed[index] > 0 // || imm_p->writes_completed[index] == UINT64_HIGH) break; } gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of cols is %d\n",cols); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of imm_p->copies is %d.\n" ,imm_p->copies); /*if all copies for this source are zero ==> process next source.*/ if (cols==imm_p->copies) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tThis source[%d] has all " "completed writes.\n" ,src); continue; } struct PVFS_server_req *req = malloc(sizeof(struct PVFS_server_req)); if (!req) { gossip_lerr("Unable to allocate PVFS_server_req.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } memset(req,0,sizeof(struct PVFS_server_req)); req->u.mirror.dst_handle = malloc(sizeof(PVFS_handle) * imm_p->copies); if ( !req->u.mirror.dst_handle) { gossip_lerr("Unable to allocate mirror.dst_handle.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } memset(req->u.mirror.dst_handle,0,sizeof(PVFS_handle) * imm_p->copies); /*index into the writes_completed array for each destination handle*/ req->u.mirror.wcIndex = malloc(sizeof(uint32_t) * imm_p->copies); if ( !req->u.mirror.wcIndex ) { gossip_lerr("Unable to allocate mirror.wcIndex.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } memset(req->u.mirror.wcIndex,0,sizeof(uint32_t) * imm_p->copies); req->op = PVFS_SERV_MIRROR; req->credentials = sm_p->req->credentials; PVFS_handle_copy(req->u.mirror.src_handle, imm_p->handle_array_base[src]); /* In the initial state or when ALL writes have failed, get destination*/ /* handles from handle_array_copies array. Otherwise, use the handles */ /* stored in the writes_completed array. */ index = imm_p->copies*src; /*first copy for this source*/ //if (imm_p->writes_completed[index] == UINT64_HIGH) if ( imm_p->writes_completed[index].status == STATUS_INIT ) { /*handle_array_copies[copy,server#] is accessed as a two-dimensional*/ /*array where a row represents a copy and columns represent the */ /*destination handles,in order of the original file distribution. We*/ /*map the source handle[i], which is also in distribution order, */ /*to handle_arrray_copies[0,i+1],[1,i+2],..,[n-1,(i+y)-1], where n */ /*is the number of copies and y is the number of handles in one copy*/ for (wc=0,row=0,cols=(src+1)%imm_p->io_servers_required; row < imm_p->copies; wc++,row++,cols=(cols+1)%imm_p->io_servers_required) { PVFS_handle_copy(req->u.mirror.dst_handle[row], handle_array_copies[row][cols]); req->u.mirror.wcIndex[row] = index + wc; req->u.mirror.dst_count++; } } else { /*status == 1*/ for (row=src,cols=0,i=0; colscopies; cols++,i++) { index = (imm_p->copies*row) + cols; //if (imm_p->writes_completed[index] > 0) //if ( !PVFS_handle_is_null(imm_p->writes_completed[index]) ) if ( imm_p->writes_completed[index].status > 0 ) { PVFS_handle_copy(req->u.mirror.dst_handle[i], imm_p->writes_completed[index].handle); req->u.mirror.wcIndex[i] = index; req->u.mirror.dst_count++; } } } req->u.mirror.fs_id = imm_p->fs_id; req->u.mirror.dist = imm_p->dist; req->u.mirror.src_server_nr = src; req->u.mirror.flow_type = fs->flowproto; req->u.mirror.encoding = fs->encoding; memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(req->u.mirror.src_handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->: src:%s\tfs_id:%d" "\tdist name:%s\tsrc server_nr:%d\n" ,PVFS_handle_string ,req->u.mirror.fs_id ,req->u.mirror.dist->dist_name ,req->u.mirror.src_server_nr ); for (i=0; iu.mirror.dst_count; i++) { memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(req->u.mirror.dst_handle[i],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->dst_handle[%d] : %s\n" ,i ,PVFS_handle_string); } struct PINT_server_op *mirror_op = malloc(sizeof(struct PINT_server_op)); if (!mirror_op) { gossip_lerr("Error allocating mirror_op"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } memset(mirror_op,0,sizeof(struct PINT_server_op)); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tabout to allocate mirror_op...\n"); if (imm_p->bstream_array_base_local) req->u.mirror.bsize = imm_p->bstream_array_base_local[src]; mirror_op->req = req; mirror_op->op = req->op; mirror_op->addr = sm_p->addr;/*get addr for this server*/ gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n" ,mirror_op->req); if ( strncmp(imm_p->io_servers[src] ,config->host_id ,SERVER_NAME_MAX-1) == 0 ) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is local.\n"); PINT_sm_push_frame(smcb, LOCAL_SRC, mirror_op); } else { /*setup msgpairarray call. This msgpair represents a connection */ /*between the meta server and a remote IO server. The request */ /*for the remote IO server is PVFS_SERV_MIRROR, which will read */ /*data residing on that server and write it to a destination */ /*handle specified in the request. The response returned from */ /*this msgpair will indicate if the copy was successful. */ gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is remote.\n"); PINT_sm_msgarray_op *msgarray_op = &(mirror_op->msgarray_op); memset(msgarray_op,0,sizeof(PINT_sm_msgarray_op)); msgarray_op->msgarray = &msgarray_op->msgpair; msgarray_op->count = 1; PINT_sm_msgpair_state *msg_p = &msgarray_op->msgpair; msg_p->req = *req; msg_p->fs_id = req->u.mirror.fs_id; PVFS_handle_copy(msg_p->handle, req->u.mirror.src_handle); msg_p->retry_flag = PVFS_MSGPAIR_RETRY; msg_p->comp_fn = mirror_comp_fn; /*setup msgarray parameters*/ PINT_serv_init_msgarray_params(mirror_op,req->u.mirror.fs_id); /*determine the BMI svr address for the source handle*/ ret = PINT_cached_config_map_to_server(&msg_p->svr_addr ,msg_p->handle ,msg_p->fs_id ); if (ret) { gossip_err("Failed to map address\n"); js_p->error_code = -1; return SM_ACTION_COMPLETE; } memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(msg_p->handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmsg_p->req.op:%d" "\tmsg_p->fs_id:%d" "\tmsg_p->handle:%s\n" ,msg_p->req.op ,msg_p->fs_id ,PVFS_handle_string ); PINT_sm_push_frame(smcb, REMOTE_SRC, mirror_op); } }/*end for (src)*/ return SM_ACTION_COMPLETE; }/*end action copy_data*/ static PINT_sm_action check_copy_results (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_copy_results....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int task_id, error_code, remaining, i, j, index; struct PINT_server_op *mirror_op = NULL; struct PVFS_servresp_mirror *respmir = NULL; struct PVFS_servreq_mirror *reqmir = NULL; char PVFS_handle_string[PVFS_HANDLE_STRING_LEN] = {0}; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n" ,js_p->error_code); js_p->error_code = 0; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsm_p->op:%d\n",sm_p->op); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d" "\tsmcb->frame_count:%d\n" ,smcb->base_frame,smcb->frame_count); /*the pjmp should have pushed at least one frame*/ assert(smcb->frame_count > (smcb->base_frame+1)); do { mirror_op=PINT_sm_pop_frame(smcb, &task_id, &error_code, &remaining); respmir = &(mirror_op->resp.u.mirror); reqmir = &(mirror_op->req->u.mirror); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->op:%d" "\ttask_id:%d" "\terror_code:%d(%0x)" "\tremaining:%d\n" ,mirror_op->op ,task_id ,error_code,error_code ,remaining); memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(respmir->src_handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp.src_handle:%s " "\tresp.src_server_nr:%d\n" ,PVFS_handle_string ,respmir->src_server_nr ); for (i=0; idst_count; i++) { gossip_debug(GOSSIP_MIRROR_DEBUG ,"\t\tbytes_written[%d]:%d\n" "\t\twrite_status_codde[%d]:%d\n" ,i ,respmir->bytes_written[i] ,i ,respmir->write_status_code[i]); } /*if error_code != 0, then NONE of the writes requested in this msgpair*/ /*array executed, so we do not need to check the individual status */ /*codes. */ if (error_code) { js_p->error_code = error_code; /*respmir has no valid data in it.*/ } else { /*check the write status for each destination associated with this */ /*particular source handle. */ for (i=0; idst_count; i++) { if (js_p->error_code) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus already " "established:%d(0x%0x)\n" ,js_p->error_code ,js_p->error_code); } else if (respmir->write_status_code[i]) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus came from error_" "code %d(0x%0x)\n" ,error_code ,error_code); js_p->error_code = respmir->write_status_code[i]; } else { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus is still zero(%d)" "\n" ,js_p->error_code); } }/*end for*/ }/*end if*/ for (i=0; idst_count; i++) { index = reqmir->wcIndex[i]; if ( error_code == 0 /*this will short circuit if false*/ && respmir->write_status_code[i] == 0) { //imm_p->writes_completed[index] = 0; imm_p->writes_completed[index].status = 0; PVFS_handle_clear(imm_p->writes_completed[index].handle); } else if (error_code == 0) { imm_p->writes_completed[index].status = 1; PVFS_handle_copy(imm_p->writes_completed[index].handle ,reqmir->dst_handle[i]); } else { //imm_p->writes_completed[index] = UINT64_HIGH; imm_p->writes_completed[index].status = STATUS_INIT; PVFS_handle_clear(imm_p->writes_completed[index].handle); } } switch(task_id) { case LOCAL_SRC: { gossip_debug(GOSSIP_MIRROR_DEBUG, "\tReturning from LOCAL call...\n"); break; } case REMOTE_SRC: { /*the destory can be moved into cleanup_msgpairarray, if none of*/ /*its values are needed in this function. */ PINT_msgpairarray_destroy(&(mirror_op->msgarray_op)); gossip_debug(GOSSIP_MIRROR_DEBUG, "\tReturning from REMOTE call .\n"); break; } }/*end switch*/ /*cleanup request/response allocations for mirror request*/ if (reqmir->dst_handle) free(reqmir->dst_handle); if (reqmir->wcIndex) free(reqmir->wcIndex); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n" ,mirror_op->req); free(mirror_op->req); if (respmir->bytes_written) free(respmir->bytes_written); if (respmir->write_status_code) free(respmir->write_status_code); free(mirror_op); } while (remaining > (smcb->base_frame+1)); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tfinal value of js_p->error_code:%d(%0x)\n" ,js_p->error_code, js_p->error_code); /*if one of the writes failed, js_p->error_code will contain an error.*/ gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_completed array[src,server#]:\n"); for (i=0; idfile_count; i++) { for (j=0; jcopies; j++) { index = (imm_p->dfile_count * i) + j; char PVFS_handle_string[PVFS_HANDLE_STRING_LEN]; PVFS_handle_unparse(imm_p->writes_completed[index].handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t[%d][%d]:%s\n" ,i,j ,PVFS_handle_string); //gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t[%d][%d]:%llu\n" // ,i,j // ,llu(imm_p->writes_completed[index])); } } return SM_ACTION_COMPLETE; }/*end action check_copy_results*/ static PINT_sm_action check_for_retries (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_for_retries....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int i; imm_p->retry_count++; /*Have we hit the retry limit?*/ if (imm_p->retry_count >= WRITE_RETRY_LIMIT) { js_p->error_code = 0; return SM_ACTION_COMPLETE; } /*Are there any writes to retry?*/ for (i=0; i<(imm_p->dfile_count * imm_p->copies); i++) { PVFS_handle_unparse(imm_p->writes_completed[i].handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_complete[%d]:%s\n" ,i,PVFS_handle_string); //if (imm_p->writes_completed[i] != 0) if ( !PVFS_handle_is_null(imm_p->writes_completed[i].handle) ) { js_p->error_code = RETRY; return SM_ACTION_COMPLETE; } } js_p->error_code = 0; return SM_ACTION_COMPLETE; }/*end state check_for_retries*/ static PINT_sm_action store_mirror_info (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing store_mirror_info....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); PVFS_handle *reorg_handles = NULL; int key_count = 3; int ret = 0,i,j; job_id_t job_id; js_p->error_code = 0; /*put copy handles in proper distribution order*/ reorg_handles = reorganize_copy_handles(imm_p); if (!reorg_handles) { gossip_lerr("Unable to create reorg_handles array.\n"); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; } /*setup key/val pairs*/ sm_p->keyval_count = key_count; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of copies:%d \tlocation:%p\n" ,imm_p->copies,&imm_p->copies); sm_p->key_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count); sm_p->val_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count); if (!sm_p->key_a || !sm_p->val_a) goto error_exit; memset(sm_p->key_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count); memset(sm_p->val_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count); /*setup user.pvfs2.mirror.handles*/ i=0; assert(ikey_a[i].buffer = malloc(sizeof(handle_key)); if (!sm_p->key_a[i].buffer) goto error_exit; strcpy(sm_p->key_a[i].buffer,handle_key); sm_p->key_a[i].buffer_sz = sizeof(handle_key); sm_p->val_a[i].buffer = reorg_handles; sm_p->val_a[i].buffer_sz = sizeof(PVFS_handle) * imm_p->dfile_count * imm_p->copies; /*setup user.pvfs2.mirror.copies*/ i++; assert(ikey_a[i].buffer = malloc(sizeof(copy_count_key)); if (!sm_p->key_a[i].buffer) goto error_exit; strcpy(sm_p->key_a[i].buffer,copy_count_key); sm_p->key_a[i].buffer_sz = sizeof(copy_count_key); sm_p->val_a[i].buffer = malloc(sizeof(imm_p->copies)); if (!sm_p->val_a[i].buffer) goto error_exit; sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies); memcpy(sm_p->val_a[i].buffer,&(imm_p->copies),sm_p->val_a[i].buffer_sz); /*setup user.pvfs2.mirror.status*/ i++; assert(ikey_a[i].buffer = malloc(sizeof(status_key)); if (!sm_p->key_a[i].buffer) goto error_exit; strcpy(sm_p->key_a[i].buffer,status_key); sm_p->key_a[i].buffer_sz = sizeof(status_key); sm_p->val_a[i].buffer = malloc(sizeof(PVFS_error) * imm_p->dfile_count * imm_p->copies); if (!sm_p->val_a[i].buffer) goto error_exit; sm_p->val_a[i].buffer_sz = sizeof(PVFS_error) * imm_p->dfile_count * imm_p->copies; for (j=0; jdfile_count*imm_p->copies; j++) { //memcpy(sm_p->val_a[i].buffer,imm_p->writes_completed // ,sm_p->val_a[i].buffer_sz); memcpy(&((PVFS_error *)(sm_p->val_a[i].buffer))[j] ,&(imm_p->writes_completed[j].status) ,sizeof(PVFS_error)); } /*verify inputs*/ i=0; assert(ikey_a[i].buffer); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsize of buffer : %d\n" ,sm_p->key_a[i].buffer_sz); PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer; for (j=0; j<(imm_p->dfile_count*imm_p->copies); j++) { PVFS_handle_unparse(myHandle[j],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%s\n" ,j ,PVFS_handle_string); } i++; assert(ival_a[i].buffer; sm_p->val_a[i].buffer_sz = sizeof(int); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p \tbuffer size:%d\n" ,(char *)sm_p->key_a[i].buffer ,*myCount,myCount ,sm_p->val_a[i].buffer_sz); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tkey count:%d\n" ,sm_p->keyval_count); i++; assert(ival_a[i].buffer); for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle(%d):status(%d)\n" ,j ,myStatus[j]); } /*store keys*/ ret = job_trove_keyval_write_list( imm_p->fs_id ,imm_p->metadata_handle ,sm_p->key_a ,sm_p->val_a ,sm_p->keyval_count ,TROVE_SYNC /*trove flags*/ ,NULL ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from call to trove : %d\n" ,ret); i=0; assert(ikey_a[i].buffer); for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++) { PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer; memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(myHandle[j],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%s\n" ,j ,PVFS_handle_string); } i++; assert(ival_a[i].buffer; gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p\n" ,(char *)sm_p->key_a[i].buffer ,*myCount ,sm_p->val_a[i].buffer); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from trove call : %d\n" ,ret); return (ret); error_exit: for (i=0; ikeyval_count; i++) { if (sm_p->key_a && sm_p->key_a[i].buffer) free(sm_p->key_a[i].buffer); if (sm_p->val_a && sm_p->val_a[i].buffer) free(sm_p->val_a[i].buffer); } if (sm_p->key_a) free(sm_p->key_a); if (sm_p->val_a) free(sm_p->val_a); js_p->error_code = -PVFS_ENOMEM; return SM_ACTION_COMPLETE; }/*end action store_mirror_info*/ static PINT_sm_action check_store_job (struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG, "Executing check_store_job....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); int i; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n" ,js_p->error_code); if (js_p->error_code) { PVFS_handle_unparse(imm_p->metadata_handle,PVFS_handle_string); gossip_err("Unable to store datahandles and number of copies for this " "mirror operation.\n"); gossip_err("\tMeta data handle is %s\n",PVFS_handle_string); return SM_ACTION_COMPLETE; } /*release memory used in previous job call*/ for (i=0; ikeyval_count; i++) { free(sm_p->key_a[i].buffer); free(sm_p->val_a[i].buffer); } free(sm_p->key_a); free(sm_p->val_a); sm_p->key_a = sm_p->val_a = NULL; sm_p->keyval_count = 0; js_p->error_code = 0; return SM_ACTION_COMPLETE; }/*end state check_store_job*/ static PINT_sm_action replace_remote_datahandle_objects(struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG, "Executing replace_remote_datahandle_objects....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); job_id_t job_id; int ret; int tmpindex; PVFS_handle pool_handle; js_p->error_code = 0; if (!imm_p->handle_array_copies_remote) { gossip_debug(GOSSIP_MIRROR_DEBUG,"handle_array_copies_remote is " "null: %p\n" ,imm_p->handle_array_copies_remote); js_p->error_code = REPLACE_DONE; return SM_ACTION_COMPLETE; } imm_p->handle_array_copies_remote_count--; if (imm_p->handle_array_copies_remote_count < 0) { js_p->error_code = REPLACE_DONE; return SM_ACTION_COMPLETE; } tmpindex = imm_p->handle_array_copies_remote_count; /* find the pool that this handle belongs to */ ret = job_precreate_pool_lookup_server( imm_p->remote_io_servers[tmpindex], PVFS_TYPE_DATAFILE, imm_p->fs_id ,&pool_handle ); if (ret < 0) { imm_p->handle_array_copies_remote_count++; js_p->error_code = ret; return SM_ACTION_COMPLETE; } ret = job_precreate_pool_fill( pool_handle ,imm_p->fs_id ,&(imm_p->handle_array_copies_remote[tmpindex]) ,1 ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL ); return ret; }/*end action replace_remote_datahandle_objects*/ static PINT_sm_action remove_local_datahandle_objects(struct PINT_smcb *smcb ,job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG, "Executing remove_local_datahandle_objects....\n"); struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies); job_id_t job_id; int ret; if (js_p->error_code) imm_p->saved_error_code = js_p->error_code; js_p->error_code = 0; if (!imm_p->handle_array_copies_local) return SM_ACTION_COMPLETE; ret = job_trove_dspace_remove_list( imm_p->fs_id ,imm_p->handle_array_copies_local ,NULL ,imm_p->handle_array_copies_local_count ,TROVE_SYNC ,smcb ,0 ,js_p ,&job_id ,server_job_context ,NULL ); return ret; }/*end action remove_local_datahandle_objects*/ static PINT_sm_action cleanup (struct PINT_smcb *smcb, job_status_s *js_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing cleanup....\n"); struct PINT_server_op *sm_p = NULL; PINT_server_create_copies_op *imm_p = NULL; int i; sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT); imm_p = &(sm_p->u.create_copies); if (js_p->error_code == NOTHING_TO_DO) js_p->error_code = 0; if (imm_p->my_remote_servers) free(imm_p->my_remote_servers); if (imm_p->writes_completed) free(imm_p->writes_completed); if (imm_p->handle_array_copies_local) free(imm_p->handle_array_copies_local); if (imm_p->handle_array_copies_remote) free(imm_p->handle_array_copies_remote); if (imm_p->remote_io_servers) { for (i=0;iremote_io_servers_count;i++) free(imm_p->remote_io_servers[i]); free(imm_p->remote_io_servers); } if (imm_p->local_io_servers) { for (i=0; ilocal_io_servers_count; i++) free(imm_p->local_io_servers[i]); free(imm_p->local_io_servers); } if (imm_p->handle_array_base) free(imm_p->handle_array_base); if (imm_p->handle_array_base_local) free(imm_p->handle_array_base_local); if (imm_p->handle_array_copies) free(imm_p->handle_array_copies); if (imm_p->io_servers) { for (i=0;iio_servers_required;i++) free(imm_p->io_servers[i]); free(imm_p->io_servers); } if (imm_p->ds_attr_a) free(imm_p->ds_attr_a); if (imm_p->bstream_array_base_local) free(imm_p->bstream_array_base_local); if (!js_p->error_code && imm_p->saved_error_code) js_p->error_code = imm_p->saved_error_code; if (imm_p->dist && imm_p->dist->dist_name) free(imm_p->dist->dist_name); if (imm_p->dist) free(imm_p->dist); gossip_debug(GOSSIP_MIRROR_DEBUG,"Leaving cleanup: error_code:%d.....\n" ,js_p->error_code); return SM_ACTION_COMPLETE; }/*end action cleanup*/ int mirror_comp_fn(void *v_p, struct PVFS_server_resp *resp_p, int i) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror_comp_fn.....\n"); PINT_smcb *smcb = v_p; struct PINT_server_op *mirror_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM); struct PVFS_servresp_mirror *respmir = &(mirror_op->resp.u.mirror); int k; gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op:%p\n",mirror_op); /* only posted one msgpair per source handle*/ assert(i==0); /* If the response status is non-zero, then the rest of the response is */ /* NOT encoded in final-response.sm. So, there are no values to access. */ /* NOTE: An error code will be returned in the status field IFF NONE of */ /* the writes were successful. Otherwise, the status of each write will */ /* be contained in the write_status_code field. */ if (resp_p->status != 0) return(resp_p->status); memset(PVFS_handle_string,0,PVFS_HANDLE_STRING_LEN); PVFS_handle_unparse(resp_p->u.mirror.src_handle,PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->src_handle:%s " "\tresp->src_server_nr:%d " "\tresp->status:%d\n" ,PVFS_handle_string ,resp_p->u.mirror.src_server_nr ,resp_p->status ); for (k=0; ku.mirror.dst_count; k++) { gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->bytes_written[%d]:%d" "\tresp->write_status_code[%d]:%d\n" ,k ,resp_p->u.mirror.bytes_written[k] ,k ,resp_p->u.mirror.write_status_code[k]); } assert(mirror_op->op == PVFS_SERV_MIRROR); memset(&(mirror_op->resp),0,sizeof(mirror_op->resp)); /*capture information from the mirror operation.*/ PVFS_handle_copy(respmir->src_handle, resp_p->u.mirror.src_handle); respmir->src_server_nr = resp_p->u.mirror.src_server_nr; respmir->dst_count = resp_p->u.mirror.dst_count; respmir->bytes_written = malloc(sizeof(uint32_t) * respmir->dst_count); if (!respmir->bytes_written) { gossip_lerr("Unable to allocate respmir->bytes_written\n"); return (-PVFS_ENOMEM); } memset(respmir->bytes_written,0,sizeof(uint32_t) * respmir->dst_count); respmir->write_status_code = malloc(sizeof(uint32_t) * respmir->dst_count); if (!respmir->write_status_code) { gossip_lerr("Unable to allocate respmir->write_status_code.\n"); return (-PVFS_ENOMEM); } memset(respmir->write_status_code,0,sizeof(uint32_t) * respmir->dst_count); memcpy(respmir->bytes_written,resp_p->u.mirror.bytes_written ,sizeof(uint32_t) * respmir->dst_count); memcpy(respmir->write_status_code,resp_p->u.mirror.write_status_code ,sizeof(uint32_t) * respmir->dst_count); gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d\tframe_count:%d\n" ,smcb->base_frame,smcb->frame_count); return(0); } /*end msgpair completion function mirror_comp_fn*/ static PVFS_handle *reorganize_copy_handles( struct PINT_server_create_copies_op *imm_p) { gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing reorganize_copy_handles..\n"); uint64_t i,j,k,rows,in_cols,out_cols; PVFS_handle *copies_out = NULL; PVFS_handle *copies_in = imm_p->handle_array_copies; rows = imm_p->copies; in_cols = imm_p->io_servers_required; out_cols = imm_p->dfile_count; /* allocate copies_out array */ copies_out = malloc(sizeof(PVFS_handle) * rows * out_cols); if (!copies_out) { gossip_lerr("Unable to allocate memeory.\n"); return (NULL); } memset(copies_out,0,sizeof(PVFS_handle) * rows * out_cols); for (i=0; i<(in_cols*rows); i++) { PVFS_handle_unparse(copies_in[i],PVFS_handle_string); gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies(%d):%s\n" ,(int)i ,PVFS_handle_string); } /*this code copies copies_in[n+1] to copies_out[n] within the same row*/ /*each row represents one copy of the logical file, i.e., each of its */ /*datahandles. */ for (i=0,k=1; iio_servers_required; i++) gossip_debug(GOSSIP_MIRROR_DEBUG,"\t [%d]:%s" "\tlength:%d\n" ,i ,imm_p->io_servers[i] ,(int)strlen(imm_p->io_servers[i])); /*Get access to the io server names residing in the cache*/ ret = PINT_cached_config_io_server_names(&list,&size,imm_p->fs_id); if (ret) { if (list) free(list); gossip_lerr("Unable to retrieve IO server names from the cache.\n"); return(ret); } gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReturned from PINT_cached_config...\n"); for (i=0; idfile_count; j++) { if (strncmp(list[i],imm_p->io_servers[j],strlen(list[i])) == 0) { list[i] = NULL; break; } }/*end for*/ }/*end for*/ /*Add server names to io_servers list*/ for (i=0,j=imm_p->dfile_count; iio_servers_required; i++) { if (list[i]) { strncpy(imm_p->io_servers[j],list[i],SERVER_NAME_MAX-1); j++; } }/*end for*/ gossip_debug(GOSSIP_MIRROR_DEBUG, "\tio_servers(after):\n"); for (i=0; iio_servers_required; i++) gossip_debug(GOSSIP_MIRROR_DEBUG,"\t [%d]:%s\n" ,i ,imm_p->io_servers[i]); /*deallocate memory used for "list"*/ free(list); return (0); }/*end function get_server_names*/ /******************************************************************************/ /* Right now, this state machine is not called as a standalone request. It is */ /* only called as a nested machine from seteattr; however, when time comes to */ /* create a standalone server request, the values used for the request */ /* parameters are listed below. */ /******************************************************************************/ static inline int PINT_get_object_ref_copies( struct PVFS_server_req *req ,PVFS_fs_id *fs_id ,PVFS_handle *handle ) { *fs_id = req->u.seteattr.fs_id; PVFS_handle_copy(*handle, req->u.seteattr.handle); return 0; }; /*request parameters*/ struct PINT_server_req_params pvfs2_create_immutable_copies_params = { .string_name = "create_immutable_copies", .perm = PINT_SERVER_CHECK_NONE, .access_type = PINT_server_req_modify, .sched_policy = PINT_SERVER_REQ_SCHEDULE, .get_object_ref = PINT_get_object_ref_copies, .state_machine = &pvfs2_create_immutable_copies_sm }; /****** E N D O F F I L E *******************************/