/* rio.c -- very very simple remote I/O routines * * Sarah Anderson * December 2001 * Laboratory for Computational Science & Engineering (LCSE) * University of Minnesota * All rights reserved. * * User callable: * * rio_read ( pathname, address, nbytes, offset ) * rio_write ( pathname, address, nbytes, offset ) * * Optional (automatically called by the above) * rio_start * rio_stop * * For this simple library, a pathname is (optionally) preceeded by * the hostname of the remote machine. * For example: * \\rend05\V:\data\dir\name \\rend05\name * or * //titan160//data/dir/name //titan160//name * * ------------- not sure the following is real useful, leave off for now * For convenience, the rio_prefix() routine can supply a simple prefix * (to prefix to all following read/write names) and/or a directory prefix * which will be inserted after the given hostname. * * For example, you may * * rio_prefix( "\\rend05\" ) * rio_directory("data\dir\" ) * rio_read ( "name", addr,nbytes,offset ) * rio_read ( "name2",addr,nbytes,offset ) *... * rio_prefix( "\\rend04\" ) * rio_read ()... * ------------- */ #ifdef _WIN32 #include #include #include #define read _read #define write _write #define open _open #define close _close #endif #include #include #include #include #include #ifndef _WIN32 #include "time.h" #include #include #endif #include #include "gm.h" #include "rio.h" #ifdef WIN32 #define CSLASH '\\' #define SLASH "\\" /* allow 64bit offset tupe for windows */ #define FOFFSET __int64 #else #define CSLASH '/' #define SLASH "/" #define FOFFSET unsigned #endif #define NOIDEA ((unsigned)(-1)) #define NOTFOUND ((unsigned)(-2)) /*------------ globals */ int Rio_Initialized=0; int Rio_Termn8server=0; /* set to end gmserver loop */ char Rioprefix[MXRIONAME]; struct gm_port *Port = NULL; /* ------- Preallocated memory buffers for gm/rio. */ /* The two 'large' buffers are MXRIOBYTES bytes, * and RioInput indicates which is currently offered as a receive buffer to gm. * RioOutput buffer is used when the pair is solely being used for disk read and * gm send. */ static int RioInput= 0; static int RioOutput=1; static void *Riobuffer[2]; static unsigned Riosize, Riolength; /* Two generic (CTS or STATUS) message buffers and one HEADER are allocated */ RioMessage_t *RioCTSbuffer, *RioSTATUSbuffer, *RioACKbuffer; RioHeader_t *RioHEADERbuffer; int RioCTSsize, RioSTATUSsize, RioHEADERsize, RioACKsize; /* We need to provide many (possibly) RioHeader and CTS buffers in the case * of many (possibly) incoming requests. Only the named ones are used here * for outgoing messages. */ #define NGMBUF 32 static void *RioCtrlBuffers[NGMBUF*2]; static void *RioHdrBuffers[NGMBUF]; static int ReceiveInProgress=0; /* client processing rio receive */ static int WriteInProgress=0; /* server processing rio_write, ie writing a file */ static int ReadInProgress=0; /* server processing rio_read, ie reading a file */ static int SendInProgress=0; /* server sending payload data */ static gm_alarm_t *Wakeup; /* Check for RIO work or timeouts this often */ typedef struct { unsigned priority, sender, port, nbytes; } SendInfo_t; static int ServerIdle=1; static int ServerServicing= -1; /* gmid of node the server is handling. Or, if gmclient, the * gmid the of the server it is interacting with. */ static double StartTime; /* wall time at which rio began */ FILE *Log; int Rio_Loglevel=0; /* 0 no output, 1 some, 2 lots */ /*------------ prototypes */ void rio_prefix( char *str ); int rio_write ( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_read ( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ); int rio_ping ( char *pathname ); int rio_start (int port,FILE *log); void rio_stop (void); int rio_gmclient ( unsigned node, char *addr, unsigned nbytes_to_xfer ); void closefile(); /*------------- externals */ extern double getmillis_(); /*---------------------------------------------------------------------- */ char *timestamp() { static time_t ltime; char *p, *cp; time( <ime ); cp= ctime( <ime ); /* discard trailing newline */ p=cp; while(*p++) if ( *p=='\n') *p=' '; return cp; } /*------------------------------------------------------------------------ * Returns nonzero if ok, starts. Otherwise, likely GM problem */ int rio_start(int useport, FILE *log) { unsigned nbytes; unsigned long mask; int i; Log= log; if ( Log != NULL ) setbuf(Log,NULL); if ( Log != NULL ) fprintf(Log,"%s: rio_start on port %d Initialized=%d Log=%p\n", timestamp(),useport,Rio_Initialized,log ); if ( Rio_Initialized ) return 1; /* if ( Log != NULL ) fprintf(Log,"%s: about to gm_init\n", timestamp() ); */ if ( gm_init() ) { if ( Log!=NULL) fprintf(Log,"Could not do gm_init!!\n"); return 0; } if (gm_open(&Port, 0, useport, "rio", GM_API_VERSION_1_1) != GM_SUCCESS) { if ( Log!=NULL ) fprintf(Log,"Couldn't open port 'Port' = %d\n",useport); return 0; } Rioprefix[0]= 0; #if 0 if ( Log != NULL ) fprintf(Log,"Available rcv tokens: %d\n", gm_num_receive_tokens(Port) ); #endif /* Allocate (double) buffers for RIODATA */ /* For simplicity, all data read/written travels in fixed size message(s) */ Riosize= gm_min_size_for_length(MXRIOBYTES); Riolength= MXRIOBYTES; nbytes= 1<=2 ) fprintf(Log,"Riobuffer length is %d for data length %d \n", Riosize, nbytes ); /* Next allocate 3 generic control message buffers */ RioCTSsize= RioSTATUSsize= RioACKsize= gm_min_size_for_length( sizeof(RioMessage_t) ); nbytes= 1<type= RIOCTS; RioSTATUSbuffer= gm_dma_malloc( Port, nbytes ); RioSTATUSbuffer->type= RIOSTATUS; RioACKbuffer= gm_dma_malloc( Port, nbytes ); RioACKbuffer->type= RIOACK; /* gm_provide_receive_buffer_with_tag( Port, RioSTATUSbuffer, RioSTATUSsize, GM_HIGH_PRIORITY, RIOSTATUS ); gm_provide_receive_buffer_with_tag( Port, RioCTSbuffer, RioCTSsize, GM_HIGH_PRIORITY, RIOCTS ); gm_provide_receive_buffer_with_tag( Port, RioACKbuffer, RioACKsize, GM_HIGH_PRIORITY, RIOACK ); */ /* the above are used for sends only - use these for incoming. */ for(i=0; i<2*NGMBUF; ++i ) { RioCtrlBuffers[i]= gm_dma_malloc(Port,nbytes); gm_provide_receive_buffer_with_tag( Port, RioCtrlBuffers[i], RioACKsize, GM_HIGH_PRIORITY, RIOACK ); } /* Allocate a RIOHEADER buffer - we send it so no need to provide it to GM */ RioHEADERsize= gm_min_size_for_length( sizeof(RioHeader_t) ); nbytes= 1<type= RIOHEADER; /* gm_provide_receive_buffer_with_tag( Port, RioHEADERbuffer, RioHEADERsize, GM_HIGH_PRIORITY, RIOHEADER ); */ /* Now allocate and provide extra Header and Control buffers for simultaneous access */ for(i=0; i=2 ) fprintf(Log,"Rioctrl length %d mask %d\n", nbytes, mask ); Rio_Initialized= useport; if ( Log!=NULL && Rio_Loglevel>=2 ) fprintf(Log," .. Initialization complete port %d\n",useport); /* atexit( rio_stop ); */ return 1; } /*----------------------------------------------------------------------- * If the buffer was used to hold an incoming message, return it to GM. */ static void swapInput() { RioInput = 1 - RioInput; gm_provide_receive_buffer_with_tag( Port, Riobuffer[RioInput], Riosize, GM_LOW_PRIORITY, RioInput ); return; } static void swapOutput() { RioOutput = 1-RioOutput; return; } static volatile int CBdone=0; /* define CBWAIT while (!CBdone) ; */ #define CBWAIT /*------------------------------------------------------------------------ * NOop callback for send completion: needs error checking * The context is a pointer containing the information necessary to drop * the send should it fail. */ gm_send_completion_callback_t sendcb( struct gm_port *port, SendInfo_t *context, gm_status_t status ) { if ( status != GM_SUCCESS ) { if (Log!=NULL) { fprintf(Log,"%s: gm send error code %d '%s'", timestamp(),status, gm_strerror(status) ); if ( context != NULL ) { if (Log!=NULL) fprintf(Log,"%s: resuming gm send %d bytes to id %d port %d priority %d\n", timestamp(),context->nbytes, context->sender, context->port, context->priority ); gm_resume_sending( port, context->priority, context->sender,context->port, (void*)sendcb, NULL ); } } } CBdone=1; return 0; } /*------------------------------------------------------------------------ * NOop callback for send completion: needs error checking * The context is a pointer containing the information necessary to drop * the send should it fail. */ gm_send_completion_callback_t sendcb2( struct gm_port *port, SendInfo_t *context, gm_status_t status ) { if ( status != GM_SUCCESS ) { if (Log!=NULL) { fprintf(Log,"%s: gm send error2 code %d '%s'", timestamp(),status, gm_strerror(status) ); if ( context != NULL ) { if (Log!=NULL) fprintf(Log,"%s: dropping gm send2%d bytes to id %d port %d priority %d\n", timestamp(),context->nbytes, context->sender, context->port, context->priority ); gm_drop_sends( port, context->priority, context->sender,context->port, (void*)sendcb, NULL ); } } } CBdone=1; SendInProgress= 0; return 0; } /*------------------------------------------------------------------ * return status code to the client. */ void sendSTATUS( int status, unsigned sender, unsigned port ) { RioMessage_t *msg; static SendInfo_t si; double t,f; msg= RioSTATUSbuffer; msg->type= RIOSTATUS; msg->status= status; si.port= port; si.sender= sender; si.priority= GM_HIGH_PRIORITY; si.nbytes=sizeof(RioMessage_t); CBdone= 0; gm_send_with_callback(Port,msg,RioSTATUSsize,sizeof(RioMessage_t), GM_HIGH_PRIORITY, sender,port, (void*)sendcb, &si ); CBWAIT ReadInProgress= WriteInProgress= 0; closefile(); ServerIdle= 1; ServerServicing= -1; f=0; t= getmillis_(); if ( status >= 0 ) f= status/((t-StartTime)*1.0e+6); if (Log!=NULL && Rio_Loglevel >= 1) fprintf(Log,"%s: gm server returning status=%d %6.1f MB/sec\n\n", timestamp(),status,f ); } /*------------------------------------------------------------------ * send... uh... generic messages */ void sendCTS( unsigned sender, unsigned port ) { static SendInfo_t si; RioCTSbuffer->type= RIOCTS; si.port= port; si.sender= sender; si.priority= GM_HIGH_PRIORITY; si.nbytes= sizeof(RioMessage_t); CBdone= 0; gm_send_with_callback(Port,RioCTSbuffer,RioCTSsize,sizeof(RioMessage_t), GM_HIGH_PRIORITY, sender,port, (void*)sendcb,&si ); CBWAIT } /*------------------------------------------------------------------------ * Supply prefix to following read/writes, adding terminating slash if * needed. */ void rio_prefix(char *str) { int n; strncpy( Rioprefix, str, MXRIONAME ); n = strlen(Rioprefix); if ( n && Rioprefix[n-1] != CSLASH ) { Rioprefix[n-1]= CSLASH; Rioprefix[n]= 0; } } /*------------------------------------------------------------------------ * Parse strings like: * \\rend05\V:\dir\name * into a pathname and hostname component, here * "rend05" and "V:\dir\name" * * or, given the other slashes for 'nix * * //rend05//scr/dir/name ==> "rend05" "/scr/dir/name" * * The result hostname is given to GM to return a gmid * to talk to. This is the return value. The name, less * the hostname, is returned as 'path'. * * If the path is local, returns NOIDEA. * If a hostname is present but GM can't find it * returns NOTFOUND */ unsigned parsehostname( char *hostname, char *name, char *path ) { unsigned int n; char *cpo, *cpi; char lhostname[MXRIONAME]; /* Given , replace all its slashes with DOS slashes * so I never have to type the bloody things */ strcpy(path,name); #ifdef _WIN32 cpi= name; while (*cpi) { if ( *cpi == '/' ) *cpi= '\\'; ++cpi; } #endif /* If hostname is given (non null and non nil ) * then use it, assuming name is a simple pathname */ if ( hostname==NULL || !*hostname ) { if ( name[0]!=CSLASH || name[1]!=CSLASH ) return NOIDEA; /* copy up to the next slash, creating the hostname */ strcpy(lhostname,name+2); cpo= lhostname; cpi=name+2; while (*cpo && *cpo!=CSLASH ) { ++cpo; ++cpi; } if ( *cpo ) { *cpo=0; ++cpi; } /* copy remainder to the output string */ strcpy(path,cpi); #if 0 if ( Log!=NULL ) fprintf(Log,"Parsed hostname '%s' pathname '%s'\n", lhostname, path ); #endif } else strcpy(lhostname,hostname); n= gm_host_name_to_node_id( Port, lhostname ); if ( n == GM_NO_SUCH_NODE_ID ) { if ( Log!=NULL ) fprintf(Log,"%s: Cannot locate Myrinet node '%s'\n", timestamp(),lhostname ); return NOTFOUND; } else return n; } /*----------------------------------------------------------------------- * Wait for server to reply with a RIOACK. Should time out, not hang. * If the server cannot open the requested file the response will be a STATUS return * instead. * * Returns 0 on failure, else 1 */ int rio_gmack(unsigned server, double timeout) { gm_recv_event_t *e; RioMessage_t *msg; int done,incase; unsigned sender,port; void *maddr; unsigned char size, type; double t0,t1; done= 0; t0= t1= getmillis_(); while (!done) { e = gm_receive(Port); incase= gm_ntoh_u8(e->recv.type); switch (incase) { case GM_NO_RECV_EVENT: //fprintf(stderr,"none %d %d\n",t0,t1); t1= getmillis_() - t0; if ( t1 > timeout ) { if (Log!=NULL ) fprintf(Log,"rio: no reply from server %s\n", gm_node_id_to_host_name(Port,server) ); return 0; /* fail after 5 seconds */ } if ( t1 > 1.0 ) { gm_sleep(1); if (Log!=NULL) fprintf(Log,"gmack waiting %d.. ",CBdone); } break; case GM_HIGH_RECV_EVENT: case GM_FAST_HIGH_RECV_EVENT: maddr= gm_ntohp( e->recv.buffer ); size = gm_ntoh_u8( e->recv.size ); sender = gm_ntoh_u16( e->recv.sender_node_id ); port = gm_ntoh_u8( e->recv.sender_port_id ); if ( incase == GM_FAST_HIGH_RECV_EVENT ) msg = (RioMessage_t*) gm_ntohp(e->recv.message); else msg = (RioMessage_t*) gm_ntohp(e->recv.buffer); type = msg->type; /* Give the recv buffer back to GM */ gm_provide_receive_buffer_with_tag( Port, maddr, size, GM_HIGH_PRIORITY, RIOACK ); if ( type != RIOACK || sender!=server) { if (Log!=NULL) fprintf(Log,"rio: unexpected reply %d to RIOHEADER from server %s when expecting server %s ignored\n", type, gm_node_id_to_host_name(Port,sender), gm_node_id_to_host_name(Port,server) ); break; } else { done=1; } break; default: gm_unknown(Port, e); } } return 1; } /*------------------------------------------------------------------------ * Compose header message and send to the remote system. nonzero * indicates a write, else its a read. */ int startRIO( int ireadwrite, unsigned node, char *name, unsigned nbytes, FOFFSET offset ) { static SendInfo_t si; RioHeader_t msg; //msg= RioHEADERbuffer; strncpy( msg.rio.name, name, MXRIONAME ); msg.rio.ireadwrite= ireadwrite; msg.rio.nbytes= nbytes; msg.rio.offset= offset; msg.type= RIOHEADER; nbytes= sizeof(msg); memcpy( RioHEADERbuffer, &msg, nbytes ); #if 1 if (Log!=NULL) fprintf(Log,"%s: startrio target node=%d: %d (%s) nbytes=%d\n", timestamp(), node, ireadwrite, (ireadwrite==0)?"read":"write", nbytes ); #endif /* We're going to use the same buffer sizes for ALL data messages */ si.port= node; si.sender= node; si.priority= GM_HIGH_PRIORITY; si.nbytes= nbytes; CBdone= 0; gm_send_with_callback( Port, RioHEADERbuffer, RioHEADERsize, nbytes, GM_HIGH_PRIORITY, node, GMSERVERPORT, (void*)sendcb, &si ); /* fprintf(stderr,"waiting for ack.. "); */ /* Wait for the ack - should time out if cannot contact the server */ if ( !rio_gmack(node,30.0) ) return -1; if ( ireadwrite == 0 ) { ReceiveInProgress= 1; sendCTS( node, GMSERVERPORT ); } return 0; } /*------------------------------------------------------------------------ * Perform specified transfer. Does not return until the transfer is totally complete. */ int rio_readwrite( int ireadwrite, char *hostname, char *pathname, void *addr, unsigned nbytes, FOFFSET offset ) { char str[MXRIONAME]; int status; unsigned node; if ( !Rio_Initialized ) if ( !rio_start(GMCLIENTPORT,NULL) ) return -2; node= parsehostname( hostname, pathname, str ); if ( node == NOTFOUND || node == NOIDEA ) { perror(str); return -1; } /* send riostart message */ if ( status=startRIO( ireadwrite, node, str, nbytes, offset ) ) return status; /* enter gmloop, which will fill supplied buffer */ status= rio_gmclient( node, addr, nbytes ); return status; } /*----------------------------------------------------------------------- * ping the server named in the composite //hostname/pathname - returns nonzero if contact */ int rio_ping( char *pathname ) { char str[MXRIONAME]; unsigned node, nbytes; RioHeader_t msg; static SendInfo_t si; if ( !Rio_Initialized ) if ( !rio_start(GMCLIENTPORT,NULL) ) return -2; node= parsehostname( NULL, pathname, str ); if ( node == NOTFOUND ) { if ( Log != NULL ) perror(str); return 0; } else if ( node == NOIDEA ) return 0; /* local path */ msg.type= RIOPING; nbytes= sizeof(msg); memcpy( RioHEADERbuffer, &msg, nbytes ); si.port= node; si.sender= node; si.priority= GM_HIGH_PRIORITY; si.nbytes= nbytes; CBdone= 0; gm_send_with_callback( Port, RioHEADERbuffer, RioHEADERsize, nbytes, GM_HIGH_PRIORITY, node, GMSERVERPORT, (void*)sendcb,&si ); /* Wait for the ack - should time out if cannot contact the server */ if ( !rio_gmack(node,1.0) ) return 0; return 1; } /*------------------------------------------------------------------------ * Perform specified read. */ int rio_read( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ) { return rio_readwrite( 0, NULL, pathname,addr,nbytes,offset ); } /*------------------------------------------------------------------------ * Perform specified write. */ int rio_write( char *pathname, void *addr, unsigned nbytes, FOFFSET offset ) { return rio_readwrite( 1, NULL, pathname,addr,nbytes,offset ); } /*------------------------------------------------------------------------ * Perform specified read. The hostname is passed in separately, without * slash delimeters. */ int rio_readh( char *hostname, char *pathname, void *addr, unsigned nbytes, FOFFSET offset ) { return rio_readwrite( 0, hostname, pathname,addr,nbytes,offset ); } /*------------------------------------------------------------------------ * Perform specified write. The hostname is passed in separately, without * slash delimeters. */ int rio_writeh( char *hostname, char *pathname, void *addr, unsigned nbytes, FOFFSET offset ) { return rio_readwrite( 1, hostname, pathname,addr,nbytes,offset ); } /*------------------------------------------------------------------------ */ void rio_stop() { int i; if (Log!=NULL) { fprintf(Log,"%s rio_stop\n",timestamp() ); fclose(Log); } if ( Rio_Initialized ) { gm_dma_free( Port, RioCTSbuffer ); gm_dma_free( Port, RioSTATUSbuffer ); gm_dma_free( Port, RioHEADERbuffer ); gm_dma_free( Port, Riobuffer[0] ); gm_dma_free( Port, Riobuffer[1] ); for(i=0; i<2*NGMBUF; ++i ) gm_dma_free( Port, RioCtrlBuffers[i] ); for(i=0; i=2) fprintf(Log,"***gmloop %s node=%d addr=%p nbytes=%d\n", timestamp(), node,addr,nbytes ); */ while (!done) { e = gm_blocking_receive(Port); incase= gm_ntoh_u8(e->recv.type); switch (incase) { case GM_NO_RECV_EVENT: //fprintf(stderr,"none\n"); break; case GM_HIGH_RECV_EVENT: case GM_FAST_HIGH_RECV_EVENT: maddr= gm_ntohp( e->recv.buffer ); tag= gm_ntoh_u8( e->recv.tag ); size = gm_ntoh_u8( e->recv.size ); /* This message may be RIOSTATUS, RIOCTS */ if ( incase == GM_FAST_HIGH_RECV_EVENT ) msg = (RioMessage_t*) gm_ntohp(e->recv.message); else msg = (RioMessage_t*) gm_ntohp(e->recv.buffer); type = msg->type; status= msg->status; /* Give the recv buffer back to GM */ gm_provide_receive_buffer_with_tag( Port, maddr, size, GM_HIGH_PRIORITY, tag ); sender = gm_ntoh_u16( e->recv.sender_node_id ); port = gm_ntoh_u8( e->recv.sender_port_id ); if ( type == RIOCTS ) { if ( sender != ServerServicing ) { if ( Log!=NULL ) fprintf(Log,"%s gmclient received unexpected RIOCTS from gmid %d not %d\n", timestamp(),sender,ServerServicing ); } //fprintf(stderr,"gmloop RIOCTS nbytesbuffered=%d nbytes=%d\n",nbytesbuffered, nbytes ); /* processing a riowrite- we can send a buffer and prepare the next */ if ( nbytes && nbytesbuffered == 0 ) { nbytesbuffered= nbytes; if ( nbytesbuffered > MXRIOBYTES ) nbytesbuffered= MXRIOBYTES; //fprintf(stderr,"initial copy %d bytes to %p\n", nbytesbuffered, addr ); memcpy( Riobuffer[1-RioOutput], addr, nbytesbuffered ); addr += nbytesbuffered; } if ( nbytes ) { /* Riolength not nbytesbuffered */ si.port= port; si.sender= sender; si.priority= GM_LOW_PRIORITY; si.nbytes= Riolength; CBdone= 0; gm_send_with_callback(Port,Riobuffer[1-RioOutput],Riosize,Riolength, GM_LOW_PRIORITY, sender,port, (void*)sendcb2,NULL ); CBWAIT nbytes -= nbytesbuffered; ip= Riobuffer[1-RioOutput]; //fprintf(stderr,"gmloop sending %d bytes from buffer %d %d %d %d %d\n",nbytesbuffered, // ip[0],ip[1],ip[2],ip[3],ip[4]); //fprintf(stderr,"gmloop sent %d bytes %d to go\n", nbytesbuffered, nbytes ); swapOutput(); } if ( nbytes != 0 ) { nbytesbuffered= nbytes; if (nbytesbuffered>MXRIOBYTES) nbytesbuffered=MXRIOBYTES; memcpy( Riobuffer[1-RioOutput], addr, nbytesbuffered ); addr += nbytesbuffered; } } else if ( type == RIOSTATUS ) { if ( sender != ServerServicing ) { if ( Log!=NULL ) fprintf(Log,"%s gmclient received unexpected RIOSTATUS from gmid %d not %d\n", timestamp(),sender,ServerServicing ); } /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log,"gmloop %s RIOSTATUS %d\n", timestamp(), msg->status ); */ /* status= msg->status; */ done= 1; ReceiveInProgress= 0; ServerServicing= -1; } break; case GM_RECV_EVENT: len = gm_ntoh_u32( e->recv.length ); sender = gm_ntoh_u16( e->recv.sender_node_id ); port = gm_ntoh_u8( e->recv.sender_port_id ); /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log,"gmclient %s RIODATA len=%d remaining=%d to addr=%p\n", timestamp(), len, nbytes, addr ); */ /* We have a RIODATA, the ONLY message which travels low priority. * We had better have a receive in progress. */ if ( !ReceiveInProgress ) { if (Log!=NULL) fprintf(Log,"Hey! unknown data at GM_RECV_EVENT, no receive running\n"); break; } if ( ServerServicing != sender ) { if ( Log!=NULL ) fprintf(Log,"%s gmclient received unexpected RIODATA from gmid %d not %d\n", timestamp(),sender,ServerServicing ); /* This should not ever happen- if so something is so wrong there is little * use trying to recover from it. This means "service is required". */ } ++ReceiveInProgress; /* dbg counter really */ swapInput(); sendCTS( sender,port ); //ip= (int*)Riobuffer[1-RioInput]; //fprintf(stderr,"buffer %d client data was: %d %d %d %d %d\n", 1-RioInput, // ip[0],ip[1],ip[2],ip[3], ip[4] ); /* don't overcopy the last message */ if ( len > nbytes ) len=nbytes; memcpy( addr, Riobuffer[1-RioInput], len ); addr += len; nbytes -= len; break; default: /* fprintf(stderr,"gmloop gm_unknown\n"); */ gm_unknown(Port, e); } } /* fprintf(stderr,"gmloop returning %d\n", status); */ return status; } #ifdef _WIN32 static HANDLE Fd; static OVERLAPPED FdOver; static __int64 FdOffset; /* Tracks current file offset, needed for windows i/o */ #else static int Fd; /* File descriptor to use to service rio read/write */ static unsigned FdOffset; #endif static unsigned FdXferred; /* returned from doread/dowrite */ #ifdef _WIN32 /* --------------------Use weirdo Windows structs */ static void winerror(char *msg) { LPTSTR lpMsgBuf; FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPTSTR) &lpMsgBuf, 0, NULL); if ( msg != NULL && Log!=NULL ) fprintf(Log,"%s\n", msg ); if (Log!=NULL) fprintf(Log,"reason: %s",lpMsgBuf); return; } /*-------------------- Figure out if we can use direct i/o, that is unbuffered. */ static unsigned long icandio( RioHeader_t *hdr ) { DWORD bytes,numFreeClusters,totalNumClusters,sectorsPerCluster; int ok, i; char name[256], *cp; /* need the DIRECTORY name - find the first trailing slash and * replace the next char (if any) with 0. If none, use NULL which causes the current * working directory to be used. */ strcpy(name,hdr->rio.name); i= 0; while ( name[i] && name[i]!=CSLASH ) ++i; if ( name[i] == 0 ) cp= NULL; else { cp=name; if (i<255) name[i+1]=0; } ok= GetDiskFreeSpace( name, §orsPerCluster, &bytes, &numFreeClusters, &totalNumClusters ); /* This is the granularity, the sector size that * the file offset and transfer count must have */ if ( !ok ) { if ( Log != NULL ) { fprintf(Log,"Cannot determine sector size for file '%s'\n",name ); winerror("Get sector size"); } return 0; } //fprintf(stderr,"cluster size=%d\n", bytes ); if ( hdr->rio.offset % bytes || hdr->rio.nbytes % bytes ) { //fprintf(stderr,"nope\n"); return 0; } //fprintf(stderr,"icandio!\n"); return FILE_FLAG_NO_BUFFERING; } /*--------------------- */ static int openfile( RioHeader_t *hdr ) { FdOffset= hdr->rio.offset; if ( hdr->rio.ireadwrite == 0 ) { Fd = CreateFile( hdr->rio.name, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, (FILE_ATTRIBUTE_READONLY | FILE_FLAG_OVERLAPPED | FILE_FLAG_SEQUENTIAL_SCAN | icandio(hdr) ), // | FILE_FLAG_NO_BUFFERING), NULL); if ( Fd==INVALID_HANDLE_VALUE ) { if (Log!=NULL) fprintf(Log,"can't open '%s'\n", hdr->rio.name ); winerror("huh"); return -1; } } else { /* open the output file */ Fd = CreateFile( hdr->rio.name, GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, ( FILE_FLAG_OVERLAPPED | FILE_FLAG_SEQUENTIAL_SCAN | icandio(hdr) ), // | FILE_FLAG_NO_BUFFERING), FILE_FLAG_WRITE_THROUGH | NULL); if (Fd == INVALID_HANDLE_VALUE) { return -1; } } return 0; } void closefile() { CloseHandle(Fd); } #else /* --------------------Good old unix */ static int openfile( RioHeader_t *hdr ) { int flags, modes, n; if ( hdr->rio.ireadwrite ) { /* write */ flags= O_RDWR | O_CREAT; modes= S_IREAD | S_IWRITE; Fd= open( hdr->rio.name, flags, modes ); } else { /* read */ flags= O_RDONLY; Fd= open( hdr->rio.name, flags ); } if ( Fd == -1 ) return Fd; /* Should use 64bit offset if available */ n= lseek( Fd, (long)hdr->rio.offset, SEEK_SET ); if ( n== -1 || (unsigned)n != hdr->rio.offset ) return -1; return 0; } void closefile() { /* fdatasync(Fd); */ close(Fd); } #endif /*------------------------------------------------------------------------ * Start read. Finish, too unless windows. */ static unsigned doread(void *buffer, unsigned nbytes) { int ok; #ifdef _WIN32 FdOver.Offset = (unsigned long)(FdOffset & 0xffffffff); FdOver.OffsetHigh = (unsigned long)(FdOffset >> 32); //fprintf(stderr,"readfile offset=%lld \n", FdOffset ); ok = ReadFile(Fd, buffer, nbytes, NULL, &FdOver ); #if 0 if ( !ok && GetLastError() != ERROR_IO_PENDING ) { winerror("rioserver can't read file"); return -1; } #endif return 0; #else FdXferred= read(Fd,buffer,nbytes); return 0; #endif } /*------------------------------------------------------------------------ * Start write. Finish, too unless windows. */ static unsigned dowrite(void *buffer, unsigned nbytes) { int ok; #ifdef _WIN32 FdOver.Offset = (unsigned long)(FdOffset & 0xffffffff); FdOver.OffsetHigh = (unsigned long)(FdOffset >> 32); //fprintf(stderr,"writefile offset=%lld \n", FdOffset ); ok = WriteFile(Fd, buffer, nbytes, &FdXferred, &FdOver ); #if 0 if ( !ok && GetLastError() !=ERROR_IO_PENDING ) { winerror("rioserver can't write file"); return -1; } #endif return 0; #else FdXferred= write(Fd,buffer,nbytes); return 0; #endif } #ifdef _WIN32 void cancelio() { CancelIo(Fd); } unsigned waitio() { if ( GetOverlappedResult( Fd,&FdOver,&FdXferred, TRUE ) ) { //fprintf(stderr,"xferred %d bytes\n", FdXferred); FdOffset += FdXferred; return FdXferred; } else { /* Need to return a negative error code.?? */ FdXferred= GetLastError(); return NOIDEA; } } #else void cancelio() { } unsigned waitio() { /* fdatasync(Fd); */ return FdXferred; } #endif /*-------------------------------------------------------------------- * Wait for outstanding overlapped i/o of bytes. * remain to be transferred to or from the client. * Returns nonzero when the operation is complete (errored out or finished). * Update with last transfer count and accumulated to */ static int checkio( unsigned *iostatus, unsigned *nbytes_disk, int nbytes, unsigned nbytesbuffered, unsigned sender, unsigned port, double t0 ) { double t1; #if 0 int *ip; #endif if ( *iostatus==0 ) { /* so far so good.. check completion status */ *iostatus= waitio(); t1= getmillis_(); *nbytes_disk += *iostatus; #if 0 fprintf(stderr,"server i/o %d bytes in %lf sec %lf MB/sec\n", *iostatus,t1-t0,(double)(*iostatus)/(1.0e+6*(t1-t0)) ); ip= (int*)Riobuffer[1-RioInput]; fprintf(stderr,"server i/o was: %d %d %d %d %d\n", ip[0],ip[1],ip[2],ip[3], ip[4] ); #endif } else { /* some error happened back on the dowrite/doread call */ if (Log!=NULL) fprintf(Log,"gmserver iofailure sending status %d to sender %d\n", *iostatus, sender ); sendSTATUS( *iostatus, sender,port ); return 1; } if ( *iostatus != nbytesbuffered ) { if (Log!=NULL) fprintf(Log,"gmserver iofailure buffered=%d iostatus=%d sender=%d\n", nbytesbuffered,iostatus, sender ); sendSTATUS( *iostatus, sender,port ); return 1; } return 0; } /*------------------------------------------------- * circular queue (fifo) for rio requests. */ static int ServerHead=0, ServerTail=0; #define MXRIOREQUESTS 32 typedef struct { unsigned sender, port; /* client */ RioHeader_t hdr; } serverRequest_t; serverRequest_t ServerRequests[MXRIOREQUESTS]; /* Store whole request for FIFO order processing */ static int addRequest( RioHeader_t *hdr, unsigned sender, unsigned port ) { int next; serverRequest_t *req; next= (ServerTail+1)%MXRIOREQUESTS; if ( next == ServerHead ) return 0; /* Can't, queue full */ req= &ServerRequests[ServerTail]; //fprintf(stderr,"TRYING to add sender,port=%d %d name %s at %d\n",sender,port, // hdr->rio.name, ServerTail ); memcpy( &req->hdr, hdr, sizeof(RioHeader_t) ); req->port= port; req->sender= sender; ServerTail= next; //fprintf(stderr,"added rio newtail=%d\n",next); return 1; } /* Returns NULL if there are none */ static RioHeader_t *removeRequest(unsigned *sender, unsigned *port) { serverRequest_t *req; if ( ServerHead == ServerTail ) return NULL; req= &ServerRequests[ServerHead]; ServerHead= (ServerHead+1)%MXRIOREQUESTS; *sender= req->sender; *port= req->port; //fprintf(stderr,"returning %d %d %s\n", *sender,*port, req->hdr.rio.name ); return &req->hdr; } static volatile int Buzz=0; static void alarmRang( void *arg ) { Buzz=1; //fprintf(stderr,"brring.."); gm_set_alarm( Port, Wakeup, 2000000, alarmRang, Wakeup ); return; } /*------------------------------------------------------------------------------------ * See rio.h for a sketch of the communication structures. * The server gm loop handles incoming requests for i/o from clients (RIOHEADER), * and saves them for FIFO execution. */ void rio_gmserver() { gm_recv_event_t *e; static SendInfo_t si; RioMessage_t *msg; RioHeader_t *hdr; void *addr; int firstwrite; int status, done, incase; unsigned nbytes, nbytes_disk, sender,port; unsigned char size, type, tag; unsigned len, nbytesbuffered, iostatus; double tnow, tclient, tsend; double t0; int *ip; double tim0,tim1, dt; int nap=1; Wakeup= (struct gm_alarm*)gm_dma_malloc(Port,sizeof(gm_alarm_t) ); gm_initialize_alarm( Wakeup ); //fprintf(stderr,"alarm initialized\n"); gm_set_alarm( Port, Wakeup, 2000000, alarmRang,NULL ); /* That is, 2.0Sec */ done= 0; iostatus= 0; /* pending io status, 0 ==> ok */ nbytesbuffered= 0; /* number of bytes in io */ tim0= getmillis_(); while (!done) { /* If the request queue is empty, block */ if ( ServerHead == ServerTail ) e= gm_blocking_receive_no_spin(Port); else e= gm_receive(Port); //e = gm_blocking_receive(Port); incase= gm_ntoh_u8( e->recv.type ); switch (incase) { case GM_ALARM_EVENT: gm_unknown(Port,e); /* Check for timed out client connections. There are two sorts, * rio_write --expected RIODATA not received in timeout interval * rio_read --expected RIOCTS not received in timeout interval */ if ( Buzz ) { if ( Rio_Termn8server ) { rio_stop(); return; } //fprintf(stderr," Alaaaaarm! Buzz=%d ",Buzz); Buzz=0; tnow= getmillis_(); if ( SendInProgress && (tnow-tsend > 15.0) ) { if ( Log!=NULL ) fprintf(Log,"%s: dangling send %lf secs\n", timestamp(),tnow-tsend); } if ( ReadInProgress|WriteInProgress && (tnow-tclient > 30.0) ) { /* Its been more than 30 seconds since an expected op. * Cancel, without sending a message to the (probably dead) client. */ cancelio(); closefile(); if ( ReadInProgress ) { if ( Log!=NULL ) fprintf(Log,"%s: Timeout: cancelling read %d\n", timestamp(),ReadInProgress); } else if ( WriteInProgress ) { if ( Log!=NULL ) fprintf(Log,"%s: Timeout: cancelling write %d\n", timestamp(), WriteInProgress); } ReadInProgress= WriteInProgress= 0; ServerIdle= 1; } } /* break; // carry on checking for new rios */ case GM_NO_RECV_EVENT: if ( !ServerIdle ) break; /* never mind, I'm busy */ /* Dispatch pending rio request if any */ hdr= removeRequest(&sender,&port); if ( hdr == NULL ) { tim1= getmillis_(); /* sleep a bit if we're spinning */ dt= tim1-tim0; // if ( dt > 1.0 ) gm_sleep(nap); break; } tim0= tim1; ServerIdle= 0; ServerServicing= sender; nbytes= hdr->rio.nbytes; /* Acknowledge the requester who has been patiently waiting */ msg= RioACKbuffer; msg->type= RIOACK; si.port= port; si.sender= sender; si.priority= GM_HIGH_PRIORITY; si.nbytes= sizeof(RioMessage_t); CBdone= 0; gm_send_with_callback(Port,msg,RioACKsize,sizeof(RioMessage_t), GM_HIGH_PRIORITY, sender,port, (void*)sendcb,&si ); nbytes_disk= 0; StartTime= getmillis_(); if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log,"%s: gm server %s '%s' nbytes=%u @%u\n", timestamp(), (hdr->rio.ireadwrite==0)?"read":"write", hdr->rio.name, nbytes, hdr->rio.offset ); status= openfile( hdr ); if ( status != 0 ) { if (Log!=NULL) fprintf(Log,"can't open\n"); /* return immediate error code */ sendSTATUS( status,sender,port ); break; } if ( hdr->rio.ireadwrite ) { /* write */ //fprintf(stderr,"gm server sending CTS\n"); WriteInProgress= 1; firstwrite= 1; sendCTS( sender, GMCLIENTPORT ); tclient= getmillis_(); } else { /* read */ ReadInProgress= 1; /* debugging/validation */ /* read: read in the first buffer to prepare for servicing the CTS */ nbytesbuffered= nbytes; if ( nbytesbuffered > MXRIOBYTES ) nbytesbuffered= MXRIOBYTES; t0= StartTime; iostatus= doread(Riobuffer[1-RioOutput],nbytesbuffered); tclient= getmillis_(); } break; case GM_HIGH_RECV_EVENT: case GM_FAST_HIGH_RECV_EVENT: addr= gm_ntohp( e->recv.buffer ); tag= gm_ntoh_u8( e->recv.tag ); sender = gm_ntoh_u16( e->recv.sender_node_id ); port = gm_ntoh_u8( e->recv.sender_port_id ); len = gm_ntoh_u32( e->recv.length ); size = gm_ntoh_u8( e->recv.size ); /* This message may be RIOHEADER, RIOCTS, RIOPING */ if ( incase == GM_FAST_HIGH_RECV_EVENT ) msg= (RioMessage_t*) gm_ntohp( e->recv.message ); else msg = (RioMessage_t*) gm_ntohp(e->recv.buffer); type = msg->type; if ( type == RIOHEADER ) addRequest( (RioHeader_t*)msg,sender,port ); if ( type == RIOPING ) { if (Log!=NULL && Rio_Loglevel>=2 ) { fprintf(Log,"server ping ack to gmid %d '%s'\n", sender,gm_node_id_to_host_name(Port,sender) ); } /* Acknowledge the pinger */ msg= RioACKbuffer; msg->type= RIOACK; si.port= port; si.sender= sender; si.priority= GM_HIGH_PRIORITY; si.nbytes= sizeof(RioMessage_t); CBdone= 0; gm_send_with_callback(Port,msg,RioACKsize,sizeof(RioMessage_t), GM_HIGH_PRIORITY, sender,port, (void*)sendcb,&si ); } /* Return the buffer to GM */ gm_provide_receive_buffer_with_tag( Port, addr, size, GM_HIGH_PRIORITY, tag ); if ( type == RIOCTS ) { if ( ServerServicing != sender ) { if ( Log!=NULL ) fprintf(Log,"%s gmserver received unexpected RIOCTS from gmid %d not %d\n", timestamp(),sender,ServerServicing ); /* This should not ever happen- if so something is so wrong there is little * use trying to recover from it. This means "service is required". */ } tclient= getmillis_(); /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log, "gm server %s RIOCTS nbytesbuffered=%d nbytes=%d\n", timestamp(), nbytesbuffered,nbytes); */ if ( nbytes == 0 ) { sendSTATUS( nbytes_disk, sender, port ); break; } ++ReadInProgress; if ( nbytesbuffered && checkio( &iostatus, &nbytes_disk, nbytes, nbytesbuffered, sender,port,t0 ) ) break; if ( nbytes == 0 ) { // fprintf(stderr,"1 sending status %d\n", nbytes_disk ); sendSTATUS( nbytes_disk, sender, port ); break; } /* Send the buffer just read*/ swapOutput(); nbytes -= nbytesbuffered; /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log, "gm server %s sending %d bytes to gmid %d port %d\n", timestamp(), Riolength, sender,port ); */ /* send Riolength, not nbytesbuffered */ si.port= port; si.sender= sender; si.priority= GM_LOW_PRIORITY; si.nbytes= Riolength; CBdone= 0; SendInProgress= 1; tsend= getmillis_(); gm_send_with_callback(Port,Riobuffer[RioOutput],Riosize,Riolength, GM_LOW_PRIORITY,sender,port, (void*)sendcb2,&si ); CBWAIT #if 0 ip= (int*)Riobuffer[RioOutput]; fprintf(stderr,"server sending: %d %d %d %d %d\n", ip[0],ip[1],ip[2],ip[3], ip[4] ); #endif /* Start read of next buffer */ nbytesbuffered= nbytes; if (nbytesbuffered > MXRIOBYTES) nbytesbuffered=MXRIOBYTES; if ( nbytes ) { t0=getmillis_(); iostatus= doread( Riobuffer[1-RioOutput], nbytesbuffered ); } //else { // //fprintf(stderr,"88 sending status\n"); // sendSTATUS( nbytes_disk, sender, port ); //} } else if ( type != RIOHEADER && type != RIOPING ) if (Log!=NULL) fprintf(Log,"rioserver: unknown message type %d ignored\n",type); break; case GM_RECV_EVENT: tclient= getmillis_(); len = gm_ntoh_u32( e->recv.length ); sender = gm_ntoh_u16( e->recv.sender_node_id ); port = gm_ntoh_u8( e->recv.sender_port_id ); /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log, "gmserver: %s RIODATA bytes=%d nbytes=%d\n", timestamp(), len, nbytes ); */ #if 0 ip= (int*)Riobuffer[RioInput]; fprintf(stderr,"server received: %d %d %d %d %d\n", ip[0],ip[1],ip[2],ip[3], ip[4] ); #endif /* We have a RIODATA, the only message which travels low priority. * We had better be serving a rio write */ if ( ServerServicing != sender ) { if ( Log!=NULL ) fprintf(Log,"%s gmserver received unexpected RIODATA from gmid %d not %d\n", timestamp(),sender,ServerServicing ); /* This should not ever happen- if so something is so wrong there is little * use trying to recover from it. This means "service is required". */ } swapInput(); /* The last buffer may be 'too' big */ if ( len > nbytes ) len= nbytes; /* nbytes is the count of bytes sent to be written */ nbytes -= len; /* If there was a previous buffer, make sure its written first. */ if ( !firstwrite ) { if ( checkio( &iostatus, &nbytes_disk, nbytes, nbytesbuffered, sender, port, t0 ) ) break; } firstwrite=0; if ( nbytes ) { /* if (Log!=NULL && Rio_Loglevel>=2) fprintf(Log,"gmserver %s sending CTS\n",timestamp() ); */ sendCTS( sender,port ); } nbytesbuffered= len; t0= getmillis_(); iostatus= dowrite( Riobuffer[1-RioInput], nbytesbuffered ); ip= (int*)Riobuffer[1-RioInput]; //fprintf(stderr,"server writing: %d %d %d %d %d\n", // ip[0],ip[1],ip[2],ip[3], ip[4] ); if ( nbytes == 0 ) { /* wait for the write.. then exit with a status send */ checkio( &iostatus, &nbytes_disk, nbytes, nbytesbuffered, sender,port,t0 ); sendSTATUS( nbytes_disk, sender, port ); } break; default: /* fprintf(stderr,"gm_unknown\n"); */ gm_unknown(Port, e); } } }