Changeset 8560

Show
Ignore:
Timestamp:
10/12/10 17:28:24 (3 years ago)
Author:
sampson
Message:

Porting BMI TCP

Location:
branches/windows-client/src
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • branches/windows-client/src/common/misc/pint-mem.c

    r8550 r8560  
    1515#ifdef WIN32 
    1616#include "wincommon.h" 
     17 
     18/* do not declare inline on Windows (can't be exported)*/ 
     19#undef inline 
     20#define inline 
    1721#endif 
    1822 
  • branches/windows-client/src/io/bmi/bmi_wintcp/bmi-wintcp.c

    r8556 r8560  
    19891989        if (tcp_addr_data->socket > -1) 
    19901990        { 
    1991             close(tcp_addr_data->socket); 
     1991            closesocket(tcp_addr_data->socket); 
    19921992        } 
    19931993    } 
     
    22652265        tmp_errno = WSAGetLastError(); 
    22662266        gossip_lerr("Error: failed to set TCP_NODELAY option.\n"); 
    2267         close(tcp_addr_data->socket); 
     2267        closesocket(tcp_addr_data->socket); 
    22682268        return (bmi_tcp_errno_to_pvfs(-tmp_errno)); 
    22692269    } 
     
    27932793    if (tcp_addr_data->socket > -1) 
    27942794    { 
    2795         close(tcp_addr_data->socket); 
     2795        closesocket(tcp_addr_data->socket); 
    27962796    } 
    27972797    tcp_addr_data->socket = -1; 
     
    35333533 
    35343534    /* perform a read on the socket so that we can get a real errno */ 
    3535     ret = read(tcp_addr_data->socket, &buf, sizeof(int)); 
     3535    ret = recv(tcp_addr_data->socket, &buf, sizeof(int), 0); 
    35363536    if (ret == 0) 
    35373537        tmp_errno = EPIPE;  /* report other side closed socket with this */ 
     
    37673767        tmp_errno = WSAGetLastError(); 
    37683768        gossip_lerr("Error: failed to set TCP_NODELAY option.\n"); 
    3769         close(*socket); 
     3769        closesocket(*socket); 
    37703770        return (bmi_tcp_errno_to_pvfs(-tmp_errno)); 
    37713771    } 
     
    37843784    if(!(*peer)) 
    37853785    { 
    3786         close(*socket); 
     3786        closesocket(*socket); 
    37873787        return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM)); 
    37883788    } 
  • branches/windows-client/src/io/bmi/bmi_wintcp/socket-collection.c

    r8556 r8560  
    1515 * will always check to see if there is data to be read on a socket. 
    1616 */ 
     17 
     18#include <WinSock2.h> 
     19 
    1720#include <stdio.h> 
    1821#include <string.h> 
    1922#include <errno.h> 
     23#include <io.h> 
    2024 
    2125#include "gossip.h" 
     
    5761 
    5862    tmp_scp->pollfd_array = (struct 
    59         pollfd*)malloc(POLLFD_ARRAY_START*sizeof(struct pollfd)); 
    60     if(!tmp_scp->pollfd_array) 
    61     { 
    62         free(tmp_scp); 
    63         return(NULL); 
    64     } 
     63        pollfd*)malloc(POLLFD_ARRAY_START*sizeof(WSAPOLLFD)); 
     64     
    6565    tmp_scp->addr_array = 
    6666        (bmi_method_addr_p*)malloc(POLLFD_ARRAY_START*sizeof(bmi_method_addr_p)); 
    6767    if(!tmp_scp->addr_array) 
    6868    { 
    69         free(tmp_scp->pollfd_array); 
    70         free(tmp_scp); 
     69        free(tmp_scp->pollfd_array); 
     70        free(tmp_scp); 
    7171        return NULL; 
    7272    } 
    73     if (pipe(tmp_scp->pipe_fd) < 0) 
     73    /*if (pipe(tmp_scp->pipe_fd) < 0)*/ 
     74    if (!CreatePipe(&(tmp_scp->pipe_fd[0]),  
     75                    &(tmp_scp->pipe_fd[1]), 
     76                    NULL, 0)) 
    7477    { 
    7578        perror("pipe failed:"); 
    76         free(tmp_scp->addr_array); 
    77         free(tmp_scp->pollfd_array); 
    78         free(tmp_scp); 
     79        BMI_socket_collection_finalize(tmp_scp); 
    7980        return NULL; 
    8081    } 
    8182 
    82     tmp_scp->array_max = POLLFD_ARRAY_START; 
     83    tmp_scp->array_max = POLLFD_ARRAY_START;     
    8384    tmp_scp->array_count = 0; 
    8485    INIT_QLIST_HEAD(&tmp_scp->remove_queue); 
     
    9596 
    9697    /* Add the pipe_fd[0] fd to the poll in set always */ 
     98    /* -- must be handled separately on Windows  
    9799    tmp_scp->pollfd_array[tmp_scp->array_count].fd = tmp_scp->pipe_fd[0]; 
    98100    tmp_scp->pollfd_array[tmp_scp->array_count].events = POLLIN; 
    99101    tmp_scp->addr_array[tmp_scp->array_count] = NULL; 
    100     tmp_scp->array_count++; 
     102    tmp_scp->array_count++;     
     103    */ 
    101104 
    102105    return (tmp_scp); 
     
    155158void BMI_socket_collection_finalize(socket_collection_p scp) 
    156159{ 
     160    free(scp->addr_array); 
    157161    free(scp->pollfd_array); 
    158     free(scp->addr_array); 
    159162    free(scp); 
    160163    return; 
     
    182185    struct tcp_addr* tcp_addr_data = NULL; 
    183186    struct tcp_addr* shifted_tcp_addr_data = NULL; 
    184     struct pollfd* tmp_pollfd_array = NULL; 
     187    WSAPOLLFD* tmp_pollfd_array = NULL; 
    185188    bmi_method_addr_p* tmp_addr_array = NULL; 
    186189    int ret = -1; 
     
    247250            { 
    248251                /* we must enlarge the poll arrays */ 
    249                 tmp_pollfd_array = (struct pollfd*)malloc( 
    250                     (scp->array_max+POLLFD_ARRAY_INC)*sizeof(struct pollfd));  
     252                tmp_pollfd_array = (WSAPOLLFD*)malloc( 
     253                    (scp->array_max+POLLFD_ARRAY_INC)*sizeof(WSAPOLLFD));  
    251254                /* TODO: handle this */ 
    252255                assert(tmp_pollfd_array); 
     
    256259                assert(tmp_addr_array); 
    257260                memcpy(tmp_pollfd_array, scp->pollfd_array, 
    258                     scp->array_max*sizeof(struct pollfd)); 
     261                    scp->array_max*sizeof(WSAPOLLFD)); 
    259262                free(scp->pollfd_array); 
    260263                scp->pollfd_array = tmp_pollfd_array; 
     
    281284    do 
    282285    { 
    283         ret = poll(scp->pollfd_array, scp->array_count, allowed_poll_time); 
    284     } while(ret < 0 && errno == EINTR); 
    285     old_errno = errno; 
     286        DWORD bytes; 
     287 
     288        /* poll for 1ms */ 
     289        ret = WSAPoll(scp->pollfd_array, scp->array_count, 1); 
     290        old_errno = WSAGetLastError(); 
     291        allowed_poll_time--; 
     292 
     293        if (ret == 0)  
     294        { 
     295            /* check our pipe */ 
     296            if (PeekNamedPipe(scp->pipe_fd[0], NULL, 0, NULL, &bytes, NULL)) 
     297            { 
     298                if (bytes) 
     299                { 
     300                    pipe_notify = 1; 
     301                } 
     302            } 
     303            else 
     304            { 
     305                ret = -1; 
     306            } 
     307        } 
     308    } while(ret == 0 && !pipe_notify && allowed_poll_time > 0);     
    286309 
    287310    if(ret < 0) 
     
    296319    } 
    297320 
     321    if (pipe_notify) 
     322    { 
     323        char c; 
     324        DWORD count; 
     325        /* drain the pipe */ 
     326        ReadFile(scp->pipe_fd[0], &c, 1, &count, NULL); 
     327    } 
     328 
    298329    tmp_count = ret; 
    299330 
     
    306337        } 
    307338        /* make sure we dont return the pipe fd as being ready */ 
    308         if (scp->pollfd_array[i].fd == scp->pipe_fd[0]) 
     339        /* if (scp->pollfd_array[i].fd == scp->pipe_fd[0]) 
    309340        { 
    310341            if (scp->pollfd_array[i].revents) { 
    311342                char c; 
    312343                /* drain the pipe */ 
    313                 read(scp->pipe_fd[0], &c, 1); 
     344        /*        _read(scp->pipe_fd[0], &c, 1); 
    314345                pipe_notify = 1; 
    315346            } 
    316347            continue; 
    317         } 
     348        } */ 
    318349        /* anything ready on this socket? */ 
    319350        if (scp->pollfd_array[i].revents) 
  • branches/windows-client/src/io/bmi/bmi_wintcp/socket-collection.h

    r8550 r8560  
    2727struct socket_collection 
    2828{ 
    29     struct pollfd* pollfd_array; 
     29    /*struct pollfd* pollfd_array;*/ 
     30    WSAPOLLFD *pollfd_array; 
    3031    bmi_method_addr_p* addr_array; 
    3132    int array_max; 
     
    3738 
    3839    int server_socket; 
    39     int pipe_fd[2]; 
     40    HANDLE pipe_fd[2]; 
    4041}; 
    4142typedef struct socket_collection* socket_collection_p; 
     
    6162    if(tcp_data->socket > -1){ \ 
    6263        char c; \ 
     64        DWORD count; \ 
    6365        gen_mutex_lock(&((s)->queue_mutex)); \ 
    6466        BMI_socket_collection_queue(s, m, &((s)->add_queue)); \ 
    6567        gen_mutex_unlock(&((s)->queue_mutex)); \ 
    66         write(s->pipe_fd[1], &c, 1);\ 
     68        WriteFile(s->pipe_fd[1], &c, 1, &count, NULL);\ 
    6769    } \ 
    6870} while(0) 
     
    7072#define BMI_socket_collection_remove(s, m) \ 
    7173do { \ 
    72     char c;\ 
     74    char c; \ 
     75    DWORD count; \ 
    7376    gen_mutex_lock(&((s)->queue_mutex)); \ 
    7477    BMI_socket_collection_queue(s, m, &((s)->remove_queue)); \ 
    7578    gen_mutex_unlock(&((s)->queue_mutex)); \ 
    76     write(s->pipe_fd[1], &c, 1);\ 
     79    WriteFile(s->pipe_fd[1], &c, 1, &count, NULL);\ 
    7780} while(0) 
    7881 
     
    8184do { \ 
    8285    char c;\ 
     86    DWORD count; \ 
    8387    struct tcp_addr* tcp_data = (struct tcp_addr *) (m)->method_data; \ 
    8488    assert(tcp_data->socket > -1); \ 
     
    8791    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \ 
    8892    gen_mutex_unlock(&((s)->queue_mutex)); \ 
    89     _write(s->pipe_fd[1], &c, 1);\ 
     93    WriteFile(s->pipe_fd[1], &c, 1, &count, NULL);\ 
    9094} while(0) 
    9195 
     
    9397do { \ 
    9498    char c;\ 
     99    DWORD count; \ 
    95100    struct tcp_addr* tcp_data = (struct tcp_addr *) (m)->method_data; \ 
    96101    gen_mutex_lock(&((s)->queue_mutex)); \ 
     
    99104    BMI_socket_collection_queue((s),(m), &((s)->add_queue)); \ 
    100105    gen_mutex_unlock(&((s)->queue_mutex)); \ 
    101     _write(s->pipe_fd[1], &c, 1);\ 
     106    WriteFile(s->pipe_fd[1], &c, 1, &count, NULL);\ 
    102107} while(0) 
    103108