Changeset 8901
- Timestamp:
- 06/29/11 09:04:42 (23 months ago)
- Files:
-
- 1 modified
Legend:
- Unmodified
- Added
- Removed
-
branches/Orange-Branch/src/io/trove/trove-dbpf/dbpf-bstream-direct.c
r8317 r8901 35 35 36 36 static gen_mutex_t dbpf_update_size_lock = GEN_MUTEX_INITIALIZER; 37 static gen_mutex_t grow_bstream_table_lock = GEN_MUTEX_INITIALIZER; 37 38 38 39 typedef struct … … 42 43 TROVE_offset offset; 43 44 } dbpf_stream_extents_t; 45 46 struct qhash_table *grow_bstream_table = NULL; 47 48 struct grow_bstream_handle 49 { 50 struct qlist_head hash_link; 51 gen_mutex_t handle_lock; 52 gen_mutex_t refcount_lock; 53 PVFS_handle handle; 54 int refcount; 55 }; 44 56 45 57 static int dbpf_bstream_get_extents( … … 53 65 dbpf_stream_extents_t *extents); 54 66 67 static int hash_handle_compare( 68 void *key, 69 struct qlist_head *link); 70 71 static int hash_handle( 72 void *handle, 73 int table_size); 74 75 static int grow_bstream_handle_table_init( int size ); 76 static int grow_bstream_handle_acquire_lock( TROVE_object_ref ref ); 77 static int grow_bstream_handle_release_lock( TROVE_object_ref ref ); 78 55 79 static size_t direct_aligned_write(int fd, 56 80 void *buf, … … 167 191 * @param write_offset - the offset into the bstream to start the write 168 192 * 169 * @param stream_size - the actual size of the bstream (might be stored elsewhere) 193 * @param stream_size - the actual size of the bstream (might be stored 194 * elsewhere) 170 195 * 171 196 * @returns bytes written, otherwise a negative errno error code … … 682 707 if(ret != 0) 683 708 { 684 gossip_err("%s: failed to get size in dspace attr: (error=%d)\n", __func__, ret); 709 gossip_err("%s: failed to get size in dspace attr: (error=%d)\n", 710 __func__, ret); 685 711 goto done; 686 712 } … … 697 723 if(ret != 0) 698 724 { 699 gossip_err("%s: failed to get bstream extents from offset/sizes: (error=%d)\n", __func__, ret); 725 gossip_err("%s: failed to get bstream extents from offset/sizes: " 726 "(error=%d)\n", __func__, ret); 700 727 goto done; 701 728 } … … 718 745 if(ret != 0) 719 746 { 720 gossip_err("%s: failed to get bstream extents from offset/sizes: (error=%d)\n", __func__, ret); 747 gossip_err("%s: failed to get bstream extents from offset/sizes: " 748 "(error=%d)\n", __func__, ret); 721 749 goto done; 722 750 } … … 733 761 { 734 762 ret = -trove_errno_to_trove_error(-ret); 735 gossip_err("%s: direct_locked_read failed: (error=%d)\n", __func__, ret); 763 gossip_err("%s: direct_locked_read failed: (error=%d)\n", __func__, 764 ret); 736 765 goto done; 737 766 } … … 778 807 if(ret != 0) 779 808 { 780 gossip_err("%s: failed to count extents from stream offset/sizes: (error=%d)\n", __func__, ret); 809 gossip_err("%s: failed to count extents from stream offset/sizes: " 810 "(error=%d)\n", __func__, ret); 781 811 goto cache_put; 782 812 } … … 800 830 if(ret != 0) 801 831 { 802 gossip_err("%s: failed to get stream extents from stream offset/sizes: (error=%d)\n", __func__, ret); 832 gossip_err("%s: failed to get stream extents from stream offset/sizes: " 833 "(error=%d)\n", __func__, ret); 803 834 goto cache_put; 804 835 } 836 837 if( grow_bstream_table == NULL ) 838 { 839 ret = grow_bstream_handle_table_init( 1021 ); 840 if( ret != 0 ) 841 { 842 gossip_err("%s: failed to create grow_bstream_handle_table\n", 843 __func__); 844 goto cache_put; 845 } 846 } 847 848 /* acquire a lock on this handle prior to getting the size to prevent 849 * a race condition between multiple writes getting the wrong size */ 850 grow_bstream_handle_acquire_lock( ref ); 805 851 806 852 ret = dbpf_dspace_attr_get(qop_p->op.coll_p, ref, &attr); 807 853 if(ret != 0) 808 854 { 809 gossip_err("%s: failed to get dspace attr for bstream: (error=%d)\n", __func__, ret); 855 gossip_err("%s: failed to get dspace attr for bstream: (error=%d)\n", 856 __func__, ret); 857 grow_bstream_handle_release_lock( ref ); 810 858 goto cache_put; 811 859 } 812 860 861 /* prior to writes see if we are growing the file, if not, release the 862 * lock for growing file size since we won't be updating the file size 863 * below */ 864 for(i = 0; i < extent_count; ++ i) 865 { 866 if(eor < stream_extents[i].offset + stream_extents[i].size) 867 { 868 eor = stream_extents[i].offset + stream_extents[i].size; 869 } 870 } 871 if(eor <= attr.u.datafile.b_size) 872 { 873 /* file size is not growing so we do not need to hold the lock 874 * since we won't update the size attribute below */ 875 grow_bstream_handle_release_lock( ref ); 876 } 877 813 878 *rw_op->out_size_p = 0; 879 814 880 for(i = 0; i < extent_count; ++ i) 815 881 { … … 822 888 if(ret < 0) 823 889 { 824 gossip_err("%s: failed to perform direct locked write: (error=%d)\n", __func__, ret); 890 gossip_err("%s: failed to perform direct locked write: " 891 "(error=%d)\n", __func__, ret); 825 892 goto cache_put; 826 893 } 827 828 if(eor < stream_extents[i].offset + stream_extents[i].size)829 {830 eor = stream_extents[i].offset + stream_extents[i].size;831 }832 894 /* did this calculation above 895 * if(eor < stream_extents[i].offset + stream_extents[i].size) 896 * { 897 * eor = stream_extents[i].offset + stream_extents[i].size; 898 * } 899 */ 833 900 *rw_op->out_size_p += ret; 834 901 } … … 842 909 if(ret != 0) 843 910 { 844 gossip_err("%s: failed to get size from dspace attr: (error=%d)\n", __func__, ret); 911 gossip_err("%s: failed to get size from dspace attr: (error=%d)\n", 912 __func__, ret); 845 913 gen_mutex_unlock(&dbpf_update_size_lock); 914 grow_bstream_handle_release_lock( ref ); 846 915 goto cache_put; 847 916 } … … 854 923 if(ret != 0) 855 924 { 856 gossip_err("%s: failed to update size in dspace attr: (error=%d)\n", __func__, ret); 925 gossip_err("%s: failed to update size in dspace attr: " 926 "(error=%d)\n", __func__, ret); 857 927 gen_mutex_unlock(&dbpf_update_size_lock); 928 grow_bstream_handle_release_lock( ref ); 858 929 goto cache_put; 859 930 } … … 886 957 if(ret < 0) 887 958 { 888 gossip_err("%s: failed to coalesce size update in dspace attr: (error=%d)\n", __func__, ret); 959 gossip_err("%s: failed to coalesce size update in dspace " 960 "attr: (error=%d)\n", __func__, ret); 961 grow_bstream_handle_release_lock( ref ); 889 962 goto done; 890 963 } 891 964 965 ret = grow_bstream_handle_release_lock( ref ); 966 if( ret != 0 ) 967 { 968 gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: failed to release " 969 "grow_bstream_handle lock when not updating " 970 "file size\n", __func__ ); 971 } 972 892 973 ret = PINT_MGMT_OP_CONTINUE; 893 974 goto done; 894 975 } 895 } 896 897 ret = PINT_MGMT_OP_COMPLETED; 976 else 977 { 978 /* still need to release the lock even thought we didn't update 979 * the size because the size calc prior to doing the writes told 980 * use we would */ 981 ret = grow_bstream_handle_release_lock( ref ); 982 if( ret != 0 ) 983 { 984 gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: failed to release " 985 "grow_bstream_handle lock when not updating " 986 "file size\n", __func__ ); 987 } 988 } 989 } 990 /* if we don't try to update the size then we already released the 991 * handle grow lock above */ 992 993 ret = PINT_MGMT_OP_COMPLETED; 898 994 899 995 cache_put: … … 917 1013 TROVE_context_id context_id, 918 1014 TROVE_op_id *out_op_id_p, 919 PVFS_hint hints)1015 PVFS_hint hints) 920 1016 { 921 1017 return -TROVE_ENOSYS; … … 932 1028 TROVE_context_id context_id, 933 1029 TROVE_op_id *out_op_id_p, 934 PVFS_hint hints)1030 PVFS_hint hints) 935 1031 { 936 1032 return -TROVE_ENOSYS; … … 962 1058 if (coll_p == NULL) 963 1059 { 964 gossip_err("%s: failed to find collection with fsid %d\n", __func__, coll_id); 1060 gossip_err("%s: failed to find collection with fsid %d\n", 1061 __func__, coll_id); 965 1062 return -TROVE_EINVAL; 966 1063 } … … 1019 1116 if(ret < 0) 1020 1117 { 1021 gossip_err("%s: failed to post direct read op: (error=%d)\n", __func__, ret); 1118 gossip_err("%s: failed to post direct read op: (error=%d)\n", 1119 __func__, ret); 1022 1120 return ret; 1023 1121 } … … 1093 1191 *out_op_id_p = q_op_p->op.id; 1094 1192 1095 gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: queuing direct write operation\n", __func__); 1193 gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: queuing direct write operation\n", 1194 __func__); 1096 1195 PINT_manager_id_post( 1097 1196 io_thread_mgr, q_op_p, &q_op_p->mgr_op_id, … … 1388 1487 } 1389 1488 1489 /* grow_bstream_handle_table_init() 1490 * 1491 * initialize the grow_bstream_table 1492 * 1493 * size: prime number of hash table size 1494 */ 1495 static int grow_bstream_handle_table_init( int size ) 1496 { 1497 gen_mutex_lock( &grow_bstream_table_lock ); 1498 if( grow_bstream_table == NULL ) 1499 { 1500 grow_bstream_table = qhash_init(hash_handle_compare, hash_handle, size); 1501 if( grow_bstream_table == NULL ) 1502 { 1503 return -PVFS_ENOMEM; 1504 } 1505 } 1506 gen_mutex_unlock( &grow_bstream_table_lock ); 1507 return 0; 1508 } 1509 1510 /* obtains a per-handle lock by locking an existing entry in the 1511 * grow_bstream_table or creating an entry and grabbing the lock. 1512 * 1513 * returns 0 on success and the handle lock held or an error 1514 */ 1515 static int grow_bstream_handle_acquire_lock( TROVE_object_ref ref ) 1516 { 1517 struct qlist_head *hash_link = NULL; 1518 struct grow_bstream_handle *grow_handle = NULL; 1519 1520 gen_mutex_lock( &grow_bstream_table_lock ); 1521 if( grow_bstream_table == NULL ) 1522 { 1523 gen_mutex_unlock( &grow_bstream_table_lock ); 1524 return -PVFS_EINVAL; 1525 } 1526 1527 hash_link = qhash_search(grow_bstream_table, &(ref.handle) ); 1528 if( hash_link ) 1529 { 1530 grow_handle = qlist_entry( hash_link, struct grow_bstream_handle, 1531 hash_link); 1532 } 1533 else 1534 { 1535 grow_handle = calloc( 1, sizeof( struct grow_bstream_handle )); 1536 if( grow_handle == NULL ) 1537 { 1538 gen_mutex_unlock( &grow_bstream_table_lock ); 1539 gossip_err( "%s: failed to alloc memory\n", __func__); 1540 return -PVFS_ENOMEM; 1541 } 1542 grow_handle->handle = ref.handle; 1543 gen_mutex_init( &(grow_handle->handle_lock) ); 1544 gen_mutex_init( &(grow_handle->refcount_lock) ); 1545 1546 /* we're safe adding it and waiting on grabbing the lock because 1547 * we still have the lock on the table so no one else 1548 * should access this new member */ 1549 qhash_add( grow_bstream_table, &(grow_handle->handle), 1550 &(grow_handle->hash_link) ); 1551 } 1552 1553 gen_mutex_unlock( &grow_bstream_table_lock ); 1554 1555 /* increment the number of things using the hash member */ 1556 gen_mutex_lock( &(grow_handle->refcount_lock) ); 1557 grow_handle->refcount++; 1558 gen_mutex_unlock( &(grow_handle->refcount_lock) ); 1559 1560 gen_mutex_lock( &(grow_handle->handle_lock)); 1561 1562 return 0; 1563 } 1564 1565 static int grow_bstream_handle_release_lock( TROVE_object_ref ref ) 1566 { 1567 struct qlist_head *hash_link = NULL; 1568 struct grow_bstream_handle *grow_handle = NULL; 1569 int rcount = 0; 1570 1571 gen_mutex_lock( &grow_bstream_table_lock ); 1572 if( grow_bstream_table == NULL ) 1573 { 1574 gen_mutex_unlock( &grow_bstream_table_lock ); 1575 return -PVFS_EINVAL; 1576 } 1577 1578 hash_link = qhash_search_and_remove(grow_bstream_table, &(ref.handle) ); 1579 if( hash_link ) 1580 { 1581 grow_handle = qlist_entry(hash_link, struct grow_bstream_handle, 1582 hash_link); 1583 1584 gen_mutex_lock( &(grow_handle->refcount_lock) ); 1585 rcount = --grow_handle->refcount; 1586 gen_mutex_unlock( &(grow_handle->refcount_lock) ); 1587 1588 /* if we're the last reference remove it from the hash and free the 1589 * memory. may want to optimize this so we aren't continuously 1590 * alloc/free for each read */ 1591 if( rcount == 0 ) 1592 { 1593 gen_mutex_unlock( &(grow_handle->handle_lock)); 1594 gen_mutex_destroy( &(grow_handle->handle_lock) ); 1595 gen_mutex_destroy( &(grow_handle->refcount_lock) ); 1596 free( grow_handle ); 1597 } 1598 else 1599 { 1600 gen_mutex_unlock( &(grow_handle->handle_lock)); 1601 } 1602 } 1603 else 1604 { 1605 /* should have an entry, but if not just report it for debugging */ 1606 gossip_debug(GOSSIP_DIRECTIO_DEBUG, "%s: no grow_handle entry when " 1607 "trying to remove with refcount %d\n", __func__, rcount); 1608 } 1609 gen_mutex_unlock( &grow_bstream_table_lock ); 1610 return 0; 1611 } 1612 1613 /* hash_handle() 1614 * 1615 * hash function for handles added to table 1616 * taken from src/server/request-scheduler/request-scheduler.c 1617 * 1618 * returns integer offset into table 1619 */ 1620 static int hash_handle( 1621 void *handle, 1622 int table_size) 1623 { 1624 /* TODO: update this later with a better hash function, 1625 * depending on what handles look like, for now just modding 1626 * 1627 */ 1628 unsigned long tmp = 0; 1629 PVFS_handle *real_handle = handle; 1630 1631 tmp += (*(real_handle)); 1632 tmp = tmp % table_size; 1633 1634 return ((int) tmp); 1635 } 1636 1637 /* hash_handle_compare() 1638 * 1639 * performs a comparison of a hash table entro to a given key 1640 * (used for searching) 1641 * taken from src/server/request-scheduler/request-scheduler.c 1642 * 1643 * returns 1 if match found, 0 otherwise 1644 */ 1645 static int hash_handle_compare( 1646 void *key, 1647 struct qlist_head *link) 1648 { 1649 struct grow_bstream_handle *my_handle; 1650 PVFS_handle *real_handle = key; 1651 1652 my_handle = qlist_entry(link, struct grow_bstream_handle, hash_link); 1653 if (my_handle->handle == *real_handle) 1654 { 1655 return (1); 1656 } 1657 1658 return (0); 1659 } 1660 1390 1661 #if 0 1391 1662 int dbpf_aligned_blocks_init(void) … … 1433 1704 } 1434 1705 1435 ablock = qlist_entry(aligned_blocks_unused.next, struct aligned_block, link); 1706 ablock = qlist_entry(aligned_blocks_unused.next, struct aligned_block, 1707 link); 1436 1708 qlist_del(&ablock->link); 1437 1709 ptr = ablock->ptr;
