/[MITgcm]/MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c
ViewVC logotype

Contents of /MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.1 - (show annotations) (download)
Sun Sep 29 22:43:01 2013 UTC (11 years, 10 months ago) by dimitri
Branch: MAIN
File MIME type: text/plain
adding code-async for llc1080

1
2 // Code to do the m-on-n fan-in, recompositing, and output, of data
3 // tiles for mitGCM.
4 //
5 // There are aspects of this code that would be simpler in C++, but
6 // we deliberately wrote the code in ansi-C to make linking it with
7 // the Fortran main code easier (should be able to just include the
8 // .o on the link line).
9
10
11 #include "PACKAGES_CONFIG.h"
12
13 #include <stdio.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20
21 #include <mpi.h>
22 #include <alloca.h>
23
24
25 #include <stdint.h>
26 #include <errno.h>
27
28 #define DEBUG 1
29
30 #if (DEBUG >= 2)
31 #define FPRINTF fprintf
32 #else
33 #include <stdarg.h>
34 void FPRINTF(FILE *fp,...){return;}
35 #endif
36
37
38 // Define our own version of "assert", where we sleep rather than abort.
39 // This makes it easier to attach the debugger.
40 #if (DEBUG >= 1)
41 #define ASSERT(_expr) \
42 if (!(_expr)) { \
43 fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44 getpid(), #_expr, __FILE__, __LINE__, __func__); \
45 sleep(9999); \
46 }
47 #else
48 #define ASSERT assert
49 #endif
50
51
52
53 // If numRanksPerNode is set to be > 0, we just use that setting.
54 // If it is <= 0, we dynamically determine the number of cores on
55 // a node, and use that. (N.B.: we assume *all* nodes being used in
56 // the run have the *same* number of cores.)
57 int numRanksPerNode = 0;
58
59 // Just an error check; can be zero (if you're confident it's all correct).
60 #define numCheckBits 2
61 // This bitMask definition works for 2's complement
62 #define bitMask ((1 << numCheckBits) - 1)
63
64
65 ///////////////////////////////////////////////////////////////////////
66 // Info about the data fields
67
68 // double for now. Might also be float someday
69 #define datumType double
70 #define datumSize (sizeof(datumType))
71
72
73 // Info about the data fields. We assume that all the fields have the
74 // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75 //
76 // the following 3 values are required here. Could be sent over or read in
77 // with some rearrangement in init routines
78 //
79 #define NUM_X 1080
80 #define NUM_Y 14040L // get rid of this someday
81 #define NUM_Z 90
82 #define MULTDIM 7
83 #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84 #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85 #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86
87 // Info about the data tiles. We assume that all the tiles are the same
88 // size (no odd-sized last piece), they all have the same X and Y
89 // characteristics (including ghosting), and are full depth in Z
90 // (either 1 or NUM_Z as appropriate).
91 //
92 // all these data now sent over from compute ranks
93 //
94 int TILE_X = -1;
95 int TILE_Y = -1;
96 int XGHOSTS = -1;
97 int YGHOSTS = -1;
98
99 // Size of one Z level of a tile (NOT one Z level of a whole field)
100 int tileOneZLevelItemCount = -1;
101 int tileOneZLevelSizeInBytes = -1;
102
103
104 typedef struct dataFieldDepth {
105 char dataFieldID;
106 int numZ;
107 } dataFieldDepth_t;
108
109 dataFieldDepth_t fieldDepths[] = {
110 { 'u', NUM_Z },
111 { 'v', NUM_Z },
112 { 'w', NUM_Z },
113 { 't', NUM_Z },
114 { 's', NUM_Z },
115 { 'x', NUM_Z },
116 { 'y', NUM_Z },
117 { 'n', 1 },
118 { 'd', 1 },
119 { 'h', 1 },
120 { 'a', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
121 { 'b', 1 },
122 { 'c', 1 },
123 { 'd', 1 },
124 { 'e', 1 },
125 { 'f', 1 },
126 { 'g', 1 },
127 };
128 #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
129
130
131
132 ///////////////////////////////////////////////////////////////////////
133 // Info about the various kinds of i/o epochs
134 typedef struct epochFieldInfo {
135 char dataFieldID;
136 MPI_Comm registrationIntercomm;
137 MPI_Comm dataIntercomm;
138 MPI_Comm ioRanksIntracomm;
139 int tileCount;
140 int zDepth; // duplicates the fieldDepth entry; filled in automatically
141 char filenameTemplate[128];
142 long int offset;
143 int pickup;
144 } fieldInfoThisEpoch_t;
145
146 // The normal i/o dump
147 fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
148 { 'u', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
149 { 'v', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
150 { 't', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "T.%010d.%s", 0,0 },
151 { 'n', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
152 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
153 };
154
155
156 // pickup file
157 fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
158 { 'u', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
159 { 'v', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
160 { 't', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
161 { 's', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
162 { 'x', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
163 { 'y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
164 { 'n', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
165 { 'd', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
166 { 'h', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
167 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
168 };
169
170
171 // seaice pickup
172 fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
173 { 'a', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
174 { 'b', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
175 { 'c', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
176 { 'd', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
177 { 'g', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
178 { 'e', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
179 { 'f', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
180 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
181 };
182
183
184 fieldInfoThisEpoch_t *epochStyles[] = {
185 fieldsForEpochStyle_0,
186 fieldsForEpochStyle_1,
187 fieldsForEpochStyle_2,
188 };
189 int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
190
191
192 typedef enum {
193 cmd_illegal,
194 cmd_newEpoch,
195 cmd_epochComplete,
196 cmd_exit,
197 } epochCmd_t;
198
199
200 ///////////////////////////////////////////////////////////////////////
201
202 // Note that a rank will only have access to one of the Intracomms,
203 // but all ranks will define the Intercomm.
204 MPI_Comm ioIntracomm = MPI_COMM_NULL;
205 int ioIntracommRank = -1;
206 MPI_Comm computeIntracomm = MPI_COMM_NULL;
207 int computeIntracommRank = -1;
208 MPI_Comm globalIntercomm = MPI_COMM_NULL;
209
210
211 #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
212 #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
213
214
215
216 typedef enum {
217 bufState_illegal,
218 bufState_Free,
219 bufState_InUse,
220 } bufState_t;
221
222
223 typedef struct buf_header{
224 struct buf_header *next;
225 bufState_t state;
226 int requestsArraySize;
227 MPI_Request *requests;
228 uint64_t ck0;
229 char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
230 uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
231 } bufHdr_t; // allocateTileBufs
232
233
234 bufHdr_t *freeTileBufs_ptr = NULL;
235 bufHdr_t *inUseTileBufs_ptr = NULL;
236
237 int maxTagValue = -1;
238 int totalNumTiles = -1;
239
240 // routine to convert double to float during memcpy
241 // need to get byteswapping in here as well
242 memcpy_r8_2_r4 (float *f, double *d, long long *n)
243 {
244 long long i, rem;
245 rem = *n%16LL;
246 for (i = 0; i < rem; i++) {
247 f [i] = d [i];
248 }
249 for (i = rem; i < *n; i += 16) {
250 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
251 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
252 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
253 __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
254 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
255 __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
256 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
257 __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
258 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
259 __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
260 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
261 __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
262 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
263 __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
264 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
265 __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
266 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
267 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
268 __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
269 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
270 __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
271 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
272 __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
273 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
274 __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
275 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
276 __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
277 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
278 __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
279 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
280 __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
281 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
282 __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
283 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
284 __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
285 }
286 }
287
288 // Debug routine
289 countBufs(int nbufs)
290 {
291 int nInUse, nFree;
292 bufHdr_t *bufPtr;
293 static int target = -1;
294
295 if (-1 == target) {
296 ASSERT(-1 != nbufs);
297 target = nbufs;
298 }
299
300 bufPtr = freeTileBufs_ptr;
301 for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
302
303 bufPtr = inUseTileBufs_ptr;
304 for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
305
306 if (nInUse + nFree != target) {
307 fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
308 ioIntracommRank, nFree, nInUse, target);
309 sleep(5000);
310 }
311 }
312
313 int readn(int fd, void *p, int nbytes)
314 {
315
316 char *ptr = (char*)(p);
317
318 int nleft, nread;
319
320 nleft = nbytes;
321 while (nleft > 0){
322 nread = read(fd, ptr, nleft);
323 if (nread < 0)
324 return(nread); // error
325 else if (nread == 0)
326 break; // EOF
327
328 nleft -= nread;
329 ptr += nread;
330 }
331 return (nbytes - nleft);
332 }
333
334
335 ssize_t writen(int fd, void *p, size_t nbytes)
336 {
337 char *ptr = (char*)(p);
338
339 size_t nleft;
340 ssize_t nwritten;
341
342 nleft = nbytes;
343 while (nleft > 0){
344 nwritten = write(fd, ptr, nleft);
345 if (nwritten <= 0){
346 if (errno==EINTR) continue; // POSIX, not SVr4
347 return(nwritten); // non-EINTR error
348 }
349
350 nleft -= nwritten;
351 ptr += nwritten;
352 }
353 return(nbytes - nleft);
354 }
355
356
357 void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
358 {
359 int i;
360 int ndims = 2;
361 int nrecords,nfields;
362 char**f;
363 char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
364 char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
365 // for now, just list the fields here. When the whole field specification apparatus
366 // is cleaned up, pull the names out of the epochstyle definition or whatever
367
368 ASSERT(1==pickup||2==pickup);
369
370 if (1==pickup){
371 nrecords = 6*NUM_Z+3;
372 nfields = sizeof(fld1)/sizeof(char*);
373 f = fld1;
374 }
375 else if (2==pickup){
376 nrecords = MULTDIM+6;
377 nfields = sizeof(fld2)/sizeof(char*);
378 f = fld2;
379 }
380
381 fprintf(fp," nDims = [ %3d ];\n",ndims);
382 fprintf(fp," dimList = [\n");
383 fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
384 fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
385 fprintf(fp," ];\n");
386 fprintf(fp," dataprec = [ 'float64' ];\n");
387 fprintf(fp," nrecords = [ %5d ];\n",nrecords);
388 fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
389 fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
390 fprintf(fp," nFlds = [ %4d ];\n",nfields);
391 fprintf(fp," fldList = {\n");
392 for (i=0;i<nfields;++i)
393 fprintf(fp," '%-8s'",f[i]);
394 fprintf(fp,"\n };\n");
395 }
396
397
398
399 double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
400 size_t outBufSize=0;
401
402
403 void
404 do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
405 {
406 if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
407
408 int pickup = whichField->pickup;
409
410 ///////////////////////////////
411 // swap here, if necessary
412 // int i,j;
413
414 //i = NUM_X*NUM_Y*numZ;
415 //mds_byteswapr8_(&i,outBuf);
416
417 // mds_byteswapr8 expects an integer count, which is gonna overflow someday
418 // can't redefine to long without affecting a bunch of other calls
419 // so do a slab at a time here, to delay the inevitable
420 // i = NUM_X*NUM_Y;
421 //for (j=0;j<numZ;++j)
422 // mds_byteswapr8_(&i,&outBuf[i*j]);
423
424 // gnu builtin evidently honored by intel compilers
425
426 if (pickup) {
427 uint64_t *alias = (uint64_t*)outBuf;
428 size_t i;
429 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
430 alias[i] = __builtin_bswap64(alias[i]);
431 }
432 else {
433 uint32_t *alias = (uint32_t*)outBuf;
434 size_t i;
435 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
436 alias[i] = __builtin_bswap32(alias[i]);
437 }
438
439 // end of swappiness
440 //////////////////////////////////
441
442 char s[1024];
443 //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
444
445 sprintf(s,whichField->filenameTemplate,gcmIter,"data");
446
447 int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
448 ASSERT(fd!=-1);
449
450 size_t nbytes;
451
452 if (pickup) {
453 lseek(fd,whichField->offset,SEEK_SET);
454 lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
455 nbytes = NUM_X*NUM_Y*numZ*datumSize;
456 }
457 else {
458 lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
459 nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
460 }
461
462 ssize_t bwrit = writen(fd,outBuf,nbytes);
463
464 if (-1==bwrit) perror("Henze : file write problem : ");
465
466 FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
467
468 // ASSERT(nbytes == bwrit);
469
470 if (nbytes!=bwrit)
471 fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
472
473 close(fd);
474
475
476 return;
477 }
478
479
480 int NTILES = -1;
481
482 typedef struct {
483 int off;
484 int skip;
485 } tile_layout_t;
486
487 tile_layout_t *offsetTable;
488
489 void
490 processSlabSection(
491 fieldInfoThisEpoch_t *whichField,
492 int tileID,
493 void *data,
494 int myNumZSlabs)
495 {
496 int intracommSize,intracommRank;
497 // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
498 MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
499 //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
500 // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
501 // offsetTable[tileID].skip,myNumZSlabs);
502
503 int z;
504
505 int pickup = whichField->pickup;
506
507 //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
508 ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
509
510 if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
511
512 free(outBuf);
513
514 outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
515
516 outBuf = malloc(outBufSize);
517 ASSERT(outBuf);
518
519 memset(outBuf,0,outBufSize);
520 }
521
522 for (z=0;z<myNumZSlabs;++z){
523
524 off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
525 off_t hoffset = offsetTable[tileID].off;
526 off_t skipdst = offsetTable[tileID].skip;
527
528 //double *dst = outBuf + zoffset + hoffset;
529 void *dst;
530 if (pickup)
531 dst = outBuf + zoffset + hoffset;
532 else
533 dst = (float*)outBuf + zoffset + hoffset;
534
535 off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
536 off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
537 double *src = (double*)data + zoff + hoff;
538
539 off_t skipsrc = TILE_X+2*XGHOSTS;
540
541 //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
542
543 long long n = TILE_X;
544
545 int y;
546 if (pickup)
547 for (y=0;y<TILE_Y;++y)
548 memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
549 else
550 for (y=0;y<TILE_Y;++y)
551 memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
552 }
553
554 return;
555 }
556
557
558
559 // Allocate some buffers to receive tile-data from the compute ranks
560 void
561 allocateTileBufs(int numTileBufs, int maxIntracommSize)
562 {
563
564 ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
565
566 int i, j;
567 for (i = 0; i < numTileBufs; ++i) {
568
569 bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
570 ASSERT(NULL != newBuf);
571
572 newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
573 ASSERT(NULL != newBuf->payload);
574
575 newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
576 ASSERT(NULL != newBuf->requests);
577
578 // Init some values
579 newBuf->requestsArraySize = maxIntracommSize;
580 for (j = 0; j < maxIntracommSize; ++j) {
581 newBuf->requests[j] = MPI_REQUEST_NULL;
582 }
583 newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
584
585 // Put the buf into the free list
586 newBuf->state = bufState_Free;
587 newBuf->next = freeTileBufs_ptr;
588 freeTileBufs_ptr = newBuf;
589 }
590 }
591
592
593
594 bufHdr_t *
595 getFreeBuf()
596 {
597 int j;
598 bufHdr_t *rtnValue = freeTileBufs_ptr;
599
600 if (NULL != rtnValue) {
601 ASSERT(bufState_Free == rtnValue->state);
602 freeTileBufs_ptr = rtnValue->next;
603 rtnValue->next = NULL;
604
605 // Paranoia. This should already be the case
606 for (j = 0; j < rtnValue->requestsArraySize; ++j) {
607 rtnValue->requests[j] = MPI_REQUEST_NULL;
608 }
609 }
610 return rtnValue;
611 }
612
613
614 /////////////////////////////////////////////////////////////////
615
616
617 bufHdr_t *
618 tryToReceiveDataTile(
619 MPI_Comm dataIntercomm,
620 int epochID,
621 size_t expectedMsgSize,
622 int *tileID)
623 {
624 bufHdr_t *bufHdr;
625 int pending, i, count;
626 MPI_Status mpiStatus;
627
628 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
629 &pending, &mpiStatus);
630
631 // If no data are pending, or if we can't get a buffer to
632 // put the pending data into, return NULL
633 if (!pending) return NULL;
634 if (((bufHdr = getFreeBuf()) == NULL)) {
635 FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
636 mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
637 return NULL;
638 }
639
640 // Do sanity checks on the pending message
641 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
642 ASSERT(count == expectedMsgSize);
643 ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
644
645 // Recieve the data we saw in the iprobe
646 MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
647 mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
648 dataIntercomm, &mpiStatus);
649 bufHdr->state = bufState_InUse;
650
651 // Overrun check
652 ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
653 ASSERT(bufHdr->ck0==bufHdr->ck1);
654
655 // Return values
656 *tileID = mpiStatus.MPI_TAG >> numCheckBits;
657
658
659 FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
660 mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
661
662 return bufHdr;
663 }
664
665
666
667 int
668 tryToReceiveZSlab(
669 void *buf,
670 int expectedMsgSize,
671 MPI_Comm intracomm)
672 {
673 MPI_Status mpiStatus;
674 int pending, count;
675
676 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
677 if (!pending) return -1;
678
679 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
680 ASSERT(count == expectedMsgSize);
681
682 MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
683 mpiStatus.MPI_TAG, intracomm, &mpiStatus);
684
685 FPRINTF(stderr, "recv slab %d from rank %d\n",
686 mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
687
688 // return the tileID
689 return mpiStatus.MPI_TAG;
690 }
691
692
693
694 void
695 redistributeZSlabs(
696 bufHdr_t *bufHdr,
697 int tileID,
698 int zSlabsPer,
699 int thisFieldNumZ,
700 MPI_Comm intracomm)
701 {
702 int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
703 int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
704 int offset = 0;
705 int recvRank = 0;
706
707 while ((offset + pieceSize) <= tileSizeInBytes) {
708
709 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
710 tileID, intracomm, &(bufHdr->requests[recvRank]));
711 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
712
713 offset += pieceSize;
714 recvRank += 1;
715 }
716
717 // There might be one last odd-sized piece
718 if (offset < tileSizeInBytes) {
719 pieceSize = tileSizeInBytes - offset;
720 ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
721
722 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
723 tileID, intracomm, &(bufHdr->requests[recvRank]));
724 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
725
726 offset += pieceSize;
727 recvRank += 1;
728 }
729
730 // Sanity check
731 ASSERT(recvRank <= bufHdr->requestsArraySize);
732 while (recvRank < bufHdr->requestsArraySize) {
733 ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
734 }
735 }
736
737
738
739 /////////////////////////////////////////////////////////////////
740 // This is called by the i/o ranks
741 void
742 doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
743 {
744 // In an i/o epoch, the i/o ranks are partitioned into groups,
745 // each group dealing with one field. The ranks within a group:
746 // (1) receive data tiles from the compute ranks
747 // (2) slice the received data tiles into slabs in the 'z'
748 // dimension, and redistribute the tile-slabs among the group.
749 // (3) receive redistributed tile-slabs, and reconstruct a complete
750 // field-slab for the whole field.
751 // (4) Write the completed field-slab to disk.
752
753 fieldInfoThisEpoch_t *fieldInfo;
754 int intracommRank, intracommSize;
755
756 int zSlabsPer;
757 int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
758
759 int numTilesRecvd, numSlabPiecesRecvd;
760
761
762 // Find the descriptor for my assigned field for this epoch style.
763 // It is the one whose dataIntercomm is not null
764 fieldInfo = epochStyles[epochStyleIndex];
765 while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
766 ASSERT('\0' != fieldInfo->dataFieldID);
767
768
769
770 MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
771 MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
772
773
774 // Compute which z slab(s) we will be reassembling.
775 zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
776 myNumZSlabs = zSlabsPer;
777
778 // Adjust myNumZSlabs in case it didn't divide evenly
779 myFirstZSlab = intracommRank * myNumZSlabs;
780 if (myFirstZSlab >= fieldInfo->zDepth) {
781 myNumZSlabs = 0;
782 } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
783 myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
784 } else {
785 myNumZSlabs = zSlabsPer;
786 }
787 myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
788
789 // If we were not assigned any z-slabs, we don't get any redistributed
790 // tile-slabs. If we were assigned one or more slabs, we get
791 // redistributed tile-slabs for every tile.
792 myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
793
794
795 numTilesRecvd = 0;
796 numSlabPiecesRecvd = 0;
797
798 ////////////////////////////////////////////////////////////////////
799 // Main loop. Handle tiles from the current epoch
800 for(;;) {
801 bufHdr_t *bufHdr;
802
803 ////////////////////////////////////////////////////////////////
804 // Check for tiles from the computational tasks
805 while (numTilesRecvd < fieldInfo->tileCount) {
806 int tileID = -1;
807 size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
808 bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
809 msgSize, &tileID);
810 if (NULL == bufHdr) break; // No tile was received
811
812 numTilesRecvd += 1;
813 redistributeZSlabs(bufHdr, tileID, zSlabsPer,
814 fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
815
816 // Add the bufHdr to the "in use" list
817 bufHdr->next = inUseTileBufs_ptr;
818 inUseTileBufs_ptr = bufHdr;
819
820
821 }
822
823
824 ////////////////////////////////////////////////////////////////
825 // Check for tile-slabs redistributed from the i/o processes
826 while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
827 int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
828 char data[msgSize];
829
830 int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
831 if (tileID < 0) break; // No slab was received
832
833
834 numSlabPiecesRecvd += 1;
835 processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
836
837
838 // Can do the write here, or at the end of the epoch.
839 // Probably want to make it asynchronous (waiting for
840 // completion at the barrier at the start of the next epoch).
841 //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
842 // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
843 // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
844 //}
845 }
846
847
848 ////////////////////////////////////////////////////////////////
849 // Check if we can release any of the inUse buffers (i.e. the
850 // isends we used to redistribute those z-slabs have completed).
851 // We do this by detaching the inUse list, then examining each
852 // element in the list, putting each element either into the
853 // free list or back into the inUse list, as appropriate.
854 bufHdr = inUseTileBufs_ptr;
855 inUseTileBufs_ptr = NULL;
856 while (NULL != bufHdr) {
857 int count = 0;
858 int completions[intracommSize];
859 MPI_Status statuses[intracommSize];
860
861 // Acquire the "next" bufHdr now, before we change anything.
862 bufHdr_t *nextBufHeader = bufHdr->next;
863
864 // Check to see if the Isend requests have completed for this buf
865 MPI_Testsome(intracommSize, bufHdr->requests,
866 &count, completions, statuses);
867
868 if (MPI_UNDEFINED == count) {
869 // A return of UNDEFINED means that none of the requests
870 // is still outstanding, i.e. they have all completed.
871 // Put the buf on the free list.
872 bufHdr->state = bufState_Free;
873 bufHdr->next = freeTileBufs_ptr;
874 freeTileBufs_ptr = bufHdr;
875 FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
876 } else {
877 // At least one request is still outstanding.
878 // Put the buf on the inUse list.
879 bufHdr->next = inUseTileBufs_ptr;
880 inUseTileBufs_ptr = bufHdr;
881 }
882
883 // Countinue with the next buffer
884 bufHdr = nextBufHeader;
885 }
886
887
888 ////////////////////////////////////////////////////////////////
889 // Check if the epoch is complete
890 if ((numTilesRecvd >= fieldInfo->tileCount) &&
891 (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
892 (NULL == inUseTileBufs_ptr))
893 {
894 ASSERT(numTilesRecvd == fieldInfo->tileCount);
895 ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
896
897 //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
898
899 // Ok, wrap up the current i/o epoch
900 // Probably want to make this asynchronous (waiting for
901 // completion at the start of the next epoch).
902 do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
903
904 // The epoch is complete
905 return;
906 }
907 }
908
909 }
910
911
912
913
914
915
916
917 // Main routine for the i/o tasks. Callers DO NOT RETURN from
918 // this routine; they MPI_Finalize and exit.
919 void
920 ioRankMain(int totalNumTiles)
921 {
922 // This is one of a group of i/o processes that will be receiving tiles
923 // from the computational processes. This same process is also
924 // responsible for compositing slices of tiles together into one or
925 // more complete slabs, and then writing those slabs out to disk.
926 //
927 // When a tile is received, it is cut up into slices, and each slice is
928 // sent to the appropriate compositing task that is dealing with those
929 // "z" level(s) (possibly including ourselves). These are tagged with
930 // the tile-id. When we *receive* such a message (from ourselves, or
931 // from any other process in this group), we unpack the data (stripping
932 // the ghost cells), and put them into the slab we are assembling. Once
933 // a slab is fully assembled, it is written to disk.
934
935 int i, size, count, numTileBufs;
936 MPI_Status status;
937 int currentEpochID = 0;
938 int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
939
940 int numComputeRanks, epochStyleIndex;
941 MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
942
943
944 ////////////////////////////////////////////////////////////
945 //// get global tile info via bcast over the global intercom
946
947
948 // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
949
950 int data[9];
951
952 int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
953
954 NTILES = data[3];
955 TILE_X = data[5];
956 TILE_Y = data[6];
957 XGHOSTS = data[7];
958 YGHOSTS = data[8];
959
960 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
961 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
962
963 int *xoff = malloc(NTILES*sizeof(int));
964 ASSERT(xoff);
965 int *yoff = malloc(NTILES*sizeof(int));
966 ASSERT(yoff);
967 int *xskip = malloc(NTILES*sizeof(int));
968 ASSERT(xskip);
969
970 offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
971 ASSERT(offsetTable);
972
973 ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
974 ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
975 ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
976
977 for (i=0;i<NTILES;++i){
978 offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
979 offsetTable[i+1].skip = xskip[i];
980 }
981
982 free(xoff);
983 free(yoff);
984 free(xskip);
985
986 ///////////////////////////////////////////////////////////////
987
988
989 // At this point, the multitude of various inter-communicators have
990 // been created and stored in the various epoch "style" arrays.
991 // The next step is to have the computational tasks "register" the
992 // data tiles they will send. In this version of the code, we only
993 // track and store *counts*, not the actual tileIDs.
994 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
995 fieldInfoThisEpoch_t *p = NULL;
996 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
997 int thisIntracommSize;
998
999 // Find the field we were assigned for this epoch style
1000 int curFieldIndex = -1;
1001 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1002 p = &(thisEpochStyle[curFieldIndex]);
1003 if (MPI_COMM_NULL != p->registrationIntercomm) break;
1004 }
1005 ASSERT(NULL != p);
1006 ASSERT('\0' != p->dataFieldID);
1007 MPI_Comm_size(p->ioRanksIntracomm, &size);
1008 if (size > maxIntracommSize) maxIntracommSize = size;
1009
1010
1011 // Receive a message from *each* computational rank telling us
1012 // a count of how many tiles that rank will send during epochs
1013 // of this style. Note that some of these counts may be zero
1014 for (i = 0; i < numComputeRanks; ++i) {
1015 MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1016 p->registrationIntercomm, &status);
1017 p->tileCount += status.MPI_TAG;
1018 }
1019 if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1020 if (p->zDepth > 1) {
1021 if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1022 }
1023
1024 // Sanity check
1025 MPI_Allreduce (&(p->tileCount), &count, 1,
1026 MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1027 ASSERT(count == totalNumTiles);
1028 }
1029
1030
1031 // In some as-of-yet-undetermined fashion, we decide how many
1032 // buffers to allocate to hold tiles received from the computational
1033 // tasks. The number of such buffers is more-or-less arbitrary (the
1034 // code should be able to function with any number greater than zero).
1035 // More buffers means more parallelism, but uses more space.
1036 // Note that maxTileCount is an upper bound on the number of buffers
1037 // that we can usefully use. Note also that if we are assigned a 2D
1038 // field, we might be assigned to recieve a very large number of tiles
1039 // for that field in that epoch style.
1040
1041 // Hack - for now, just pick a value for numTileBufs
1042 numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1043 if (numTileBufs < 4) numTileBufs = 4;
1044 if (numTileBufs > 15) numTileBufs = 15;
1045 // if (numTileBufs > 25) numTileBufs = 25;
1046
1047 allocateTileBufs(numTileBufs, maxIntracommSize);
1048 countBufs(numTileBufs);
1049
1050
1051 ////////////////////////////////////////////////////////////////////
1052 // Main loop.
1053 ////////////////////////////////////////////////////////////////////
1054
1055 for (;;) {
1056 int cmd[4];
1057
1058 // Make sure all the i/o ranks have completed processing the prior
1059 // cmd (i.e. the prior i/o epoch is complete).
1060 MPI_Barrier(ioIntracomm);
1061
1062 if (0 == ioIntracommRank) {
1063 fprintf(stderr, "I/O ranks waiting for new epoch\n");
1064 MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1065
1066 MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1067 fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1068
1069 // before we start a new epoch, have i/o rank 0:
1070 // determine output filenames for this epoch
1071 // clean up any extant files with same names
1072 // write .meta files
1073
1074 if (cmd_exit != cmd[0]){
1075
1076 fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1077
1078 int epochStyle = cmd[2];
1079 int gcmIter = cmd[3];
1080
1081 fieldInfoThisEpoch_t *fieldInfo;
1082 fieldInfo = epochStyles[epochStyle];
1083 char s[1024];
1084 int res;
1085 FILE *fp;
1086
1087 if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1088 char f;
1089 while (f = fieldInfo->dataFieldID){
1090 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1091 fprintf(stderr,"%s\n",s);
1092 res = unlink(s);
1093 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1094
1095 // skip writing meta files for non-pickup fields
1096 /*
1097 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1098 fp = fopen(s,"w+");
1099 fclose(fp);
1100 */
1101
1102 ++fieldInfo;
1103
1104 }
1105 }
1106
1107 else { // single pickup or pickup_seaice file
1108
1109 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1110 fprintf(stderr,"%s\n",s);
1111 res = unlink(s);
1112 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1113
1114 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1115 fp = fopen(s,"w+");
1116 write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1117 fclose(fp);
1118
1119 }
1120 }
1121 }
1122 MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1123
1124 switch (cmd[0]) {
1125
1126 case cmd_exit:
1127 // Don't bother trying to disconnect and free the
1128 // plethora of communicators; just exit.
1129 FPRINTF(stderr,"Received shutdown message\n");
1130 MPI_Finalize();
1131 exit(0);
1132 break;
1133
1134 case cmd_newEpoch:
1135 if ((currentEpochID + 1) != cmd[1]) {
1136 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1137 "but is now %d ??\n", currentEpochID, cmd[1]);
1138 sleep(1); // Give the message a chance to propagate
1139 abort();
1140 }
1141 currentEpochID = cmd[1];
1142
1143 memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1144
1145 doNewEpoch(cmd[1], cmd[2], cmd[3]);
1146 break;
1147
1148 default:
1149 fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1150 cmd[0], cmd[1], cmd[2], cmd[3]);
1151 sleep(1);
1152 abort();
1153 break;
1154 }
1155
1156 }
1157
1158 }
1159
1160
1161
1162 ///////////////////////////////////////////////////////////////////////////////////
1163
1164 int
1165 findField(const char c)
1166 {
1167 int i;
1168 for (i = 0; i < numAllFields; ++i) {
1169 if (c == fieldDepths[i].dataFieldID) return i;
1170 }
1171
1172 // Give the error message a chance to propagate before exiting.
1173 fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1174 sleep(1);
1175 abort();
1176 }
1177
1178
1179
1180 // Given a number of ranks and a set of fields we will want to output,
1181 // figure out how to distribute the ranks among the fields.
1182 void
1183 computeIORankAssigments(
1184 int numComputeRanks,
1185 int numIORanks,
1186 int numFields,
1187 fieldInfoThisEpoch_t *fields,
1188 int assignments[])
1189 {
1190 int i,j,k, sum, count;
1191
1192 int numIONodes = numIORanks / numRanksPerNode;
1193 int numIORanksThisField[numFields];
1194 long int bytesPerIORankThisField[numFields];
1195 int expectedMessagesPerRankThisField[numFields];
1196 long int fieldSizes[numFields];
1197
1198 struct ioNodeInfo_t {
1199 int expectedNumMessagesThisNode;
1200 int numCoresAssigned;
1201 int *assignedFieldThisCore;
1202 } ioNodeInfo[numIONodes];
1203
1204 // Since numRanksPerNode might be dynamically determined, we have
1205 // to dynamically allocate the assignedFieldThisCore arrays.
1206 for (i = 0; i < numIONodes; ++i) {
1207 ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1208 ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1209 }
1210
1211 ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1212
1213
1214 //////////////////////////////////////////////////////////////
1215 // Compute the size for each field in this epoch style
1216 for (i = 0; i < numFields; ++i) {
1217 ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1218 fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1219 }
1220
1221
1222 /////////////////////////////////////////////////////////
1223 // Distribute the available i/o ranks among the fields,
1224 // proportionally based on field size.
1225
1226 // First, assign one rank per field
1227 ASSERT(numIORanks >= numFields);
1228 for (i = 0; i < numFields; ++i) {
1229 numIORanksThisField[i] = 1;
1230 bytesPerIORankThisField[i] = fieldSizes[i];
1231 }
1232
1233 // Now apportion any extra ranks
1234 for (; i < numIORanks; ++i) {
1235
1236 // Find the field 'k' with the most bytesPerIORank
1237 k = 0;
1238 for (j = 1; j < numFields; ++j) {
1239 if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1240 k = j;
1241 }
1242 }
1243
1244 // Assign an i/o rank to that field
1245 numIORanksThisField[k] += 1;
1246 bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1247 }
1248
1249 ////////////////////////////////////////////////////////////
1250 // At this point, all the i/o ranks should be apportioned
1251 // among the fields. Check we didn't screw up the count.
1252 for (sum = 0, i = 0; i < numFields; ++i) {
1253 sum += numIORanksThisField[i];
1254 }
1255 ASSERT(sum == numIORanks);
1256
1257
1258
1259 //////////////////////////////////////////////////////////////////
1260 // The *number* of i/o ranks assigned to a field is based on the
1261 // field size. But the *placement* of those ranks is based on
1262 // the expected number of messages the process will receive.
1263 // [In particular, if we have several fields each with only one
1264 // assigned i/o rank, we do not want to place them all on the
1265 // same node if we don't have to.] This traffic-load balance
1266 // strategy is an approximation at best since the messages may
1267 // not all be the same size (e.g. 2D fields vs. 3D fields), so
1268 // "number of messages" is not the same thing as "traffic".
1269 // But it should suffice.
1270
1271 // Init a couple of things
1272 for (i = 0; i < numFields; ++i) {
1273 expectedMessagesPerRankThisField[i] =
1274 numComputeRanks / numIORanksThisField[i];
1275 }
1276 for (i = 0; i < numIONodes; ++i) {
1277 ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1278 ioNodeInfo[i].numCoresAssigned = 0;
1279 for (j = 0; j < numRanksPerNode; ++j) {
1280 ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1281 }
1282 }
1283
1284 ///////////////////////////////////////////////////////////////////
1285 // Select the i/o node with the smallest expectedNumMessages, and
1286 // assign it a rank from the field with the highest remaining
1287 // expectedMessagesPerRank. Repeat until everything is assigned.
1288 // (Yes, this could be a lot faster, but isn't worth the trouble.)
1289
1290 for (count = 0; count < numIORanks; ++count) {
1291
1292 // Make an initial choice of node
1293 for (i = 0; i < numIONodes; ++i) {
1294 if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1295 }
1296 j = i;
1297 ASSERT(j < numIONodes);
1298
1299 // Search for a better choice
1300 for (++i; i < numIONodes; ++i) {
1301 if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1302 if (ioNodeInfo[i].expectedNumMessagesThisNode <
1303 ioNodeInfo[j].expectedNumMessagesThisNode)
1304 {
1305 j = i;
1306 }
1307 }
1308
1309
1310 // Make an initial choice of field
1311 for (i = 0; i < numFields; ++i) {
1312 if (numIORanksThisField[i] > 0) break;
1313 }
1314 k = i;
1315 ASSERT(k < numFields);
1316
1317 // Search for a better choice
1318 for (++i; i < numFields; ++i) {
1319 if (numIORanksThisField[i] <= 0) continue;
1320 if (expectedMessagesPerRankThisField[i] >
1321 expectedMessagesPerRankThisField[k])
1322 {
1323 k = i;
1324 }
1325 }
1326
1327 // Put one rank from the chosen field onto the chosen node
1328 ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1329 ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1330 ioNodeInfo[j].numCoresAssigned += 1;
1331 numIORanksThisField[k] -= 1;
1332 }
1333
1334 // Sanity check - all ranks were assigned to a core
1335 for (i = 1; i < numFields; ++i) {
1336 ASSERT(0 == numIORanksThisField[i]);
1337 }
1338 // Sanity check - all cores were assigned a rank
1339 for (i = 1; i < numIONodes; ++i) {
1340 ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1341 }
1342
1343
1344 /////////////////////////////////////
1345 // Return the computed assignments
1346 for (i = 0; i < numIONodes; ++i) {
1347 for (j = 0; j < numRanksPerNode; ++j) {
1348 assignments[i*numRanksPerNode + j] =
1349 ioNodeInfo[i].assignedFieldThisCore[j];
1350 }
1351 }
1352
1353 // Clean up
1354 for (i = 0; i < numIONodes; ++i) {
1355 free(ioNodeInfo[i].assignedFieldThisCore);
1356 }
1357
1358 }
1359
1360 //////////////////////////////////////////////////////////////////////////////////
1361
1362
1363
1364
1365 int
1366 isIORank(int commRank, int totalNumNodes, int numIONodes)
1367 {
1368 // Figure out if this rank is on a node that will do i/o.
1369 // Note that the i/o nodes are distributed throughout the
1370 // task, not clustered together.
1371 int ioNodeStride = totalNumNodes / numIONodes;
1372 int thisRankNode = commRank / numRanksPerNode;
1373 int n = thisRankNode / ioNodeStride;
1374 return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1375 }
1376
1377
1378 // Find the number of *physical cores* on the current node
1379 // (ignore hyperthreading). This should work for pretty much
1380 // any Linux based system (and fail horribly for anything else).
1381 int
1382 getNumCores(void)
1383 {
1384 return 20; // until we figure out why this cratered
1385
1386 char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1387 "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1388 "grep -v ^$ | sort -u | wc -l";
1389
1390 FILE *fp = popen(magic,"r");
1391 ASSERT(fp);
1392
1393 int rtnValue = -1;
1394
1395 int res = fscanf(fp,"%d",&rtnValue);
1396
1397 ASSERT(1==res);
1398
1399 pclose(fp);
1400
1401 ASSERT(rtnValue > 0);
1402 return rtnValue;
1403 }
1404
1405
1406
1407 ////////////////////////////////////////////////////////////////////////
1408 ////////////////////////////////////////////////////////////////////////
1409 // Routines called by the mitGCM code
1410
1411
1412 ////////////////////////////////////////
1413 // Various "one time" initializations
1414 void
1415 f1(
1416 MPI_Comm parentComm,
1417 int numComputeRanks,
1418 int numTiles,
1419 MPI_Comm *rtnComputeComm)
1420 {
1421 MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1422 int newIntracommRank, thisIsIORank;
1423 int parentSize, parentRank;
1424 int numIONodes, numIORanks;
1425 int mpiIsInitialized, *intPtr, tagUBexists;
1426 int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1427 int totalNumNodes, numComputeNodes, newIntracommSize;
1428
1429 // Init globals
1430 totalNumTiles = numTiles;
1431 if (numRanksPerNode <= 0) {
1432 // Might also want to check for an env var (or something)
1433 numRanksPerNode = getNumCores();
1434 }
1435
1436 // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1437 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1438 fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1439
1440 int curFieldIndex = -1;
1441 while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1442 i = findField(f->dataFieldID);
1443 if (-1 != f->zDepth) {
1444 if (f->zDepth != fieldDepths[i].numZ) {
1445 fprintf(stderr, "Inconsistent z-depth for field '%c': "
1446 "fieldDepths[%d] and epoch style %d\n",
1447 f->dataFieldID, i, epochStyleIndex);
1448 }
1449 }
1450 f->zDepth = fieldDepths[i].numZ;
1451 }
1452 }
1453
1454
1455 // Find the max MPI "tag" value
1456 MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1457 ASSERT(tagUBexists);
1458 maxTagValue = *intPtr;
1459
1460
1461 // Info about the parent communicator
1462 MPI_Initialized(&mpiIsInitialized);
1463 ASSERT(mpiIsInitialized);
1464 MPI_Comm_size(parentComm, &parentSize);
1465 MPI_Comm_rank(parentComm, &parentRank);
1466
1467
1468 // Figure out how many nodes we can use for i/o.
1469 // To make things (e.g. memory calculations) simpler, we want
1470 // a node to have either all compute ranks, or all i/o ranks.
1471 // If numComputeRanks does not evenly divide numRanksPerNode, we have
1472 // to round up in favor of the compute side.
1473
1474 totalNumNodes = divCeil(parentSize, numRanksPerNode);
1475 numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1476 numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1477 ASSERT(numIONodes > 0);
1478 ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1479 numIORanks = numIONodes * numRanksPerNode;
1480
1481
1482 // It is surprisingly easy to launch the job with the wrong number
1483 // of ranks, particularly if the number of compute ranks is not a
1484 // multiple of numRanksPerNode (the tendency is to just launch one
1485 // rank per core for all available cores). So we introduce a third
1486 // option for a rank: besides being an i/o rank or a compute rank,
1487 // it might instead be an "excess" rank, in which case it just
1488 // calls MPI_Finalize and exits.
1489
1490 typedef enum { isCompute, isIO, isExcess } rankType;
1491 rankType thisRanksType;
1492
1493 if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1494 thisRanksType = isIO;
1495 } else if (parentRank >= numComputeRanks + numIORanks) {
1496 thisRanksType = isExcess;
1497 } else {
1498 thisRanksType = isCompute;
1499 }
1500
1501 // Split the parent communicator into parts
1502 MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1503 MPI_Comm_rank(newIntracomm, &newIntracommRank);
1504
1505 // Excess ranks disappear
1506 // N.B. "parentSize" still includes these ranks.
1507 if (isExcess == thisRanksType) {
1508 MPI_Finalize();
1509 exit(0);
1510 }
1511
1512 // Sanity check
1513 MPI_Comm_size(newIntracomm, &newIntracommSize);
1514 if (isIO == thisRanksType) {
1515 ASSERT(newIntracommSize == numIORanks);
1516 } else {
1517 ASSERT(newIntracommSize == numComputeRanks);
1518 }
1519
1520
1521 // Create a special intercomm from the i/o and compute parts
1522 if (isIO == thisRanksType) {
1523 // Set globals
1524 ioIntracomm = newIntracomm;
1525 MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1526
1527 // Find the lowest computation rank
1528 for (i = 0; i < parentSize; ++i) {
1529 if (!isIORank(i, totalNumNodes, numIONodes)) break;
1530 }
1531 } else { // isCompute
1532 // Set globals
1533 computeIntracomm = newIntracomm;
1534 MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1535
1536 // Find the lowest IO rank
1537 for (i = 0; i < parentSize; ++i) {
1538 if (isIORank(i, totalNumNodes, numIONodes)) break;
1539 }
1540 }
1541 MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1542
1543
1544
1545 ///////////////////////////////////////////////////////////////////
1546 // For each different i/o epoch style, split the i/o ranks among
1547 // the fields, and create an inter-communicator for each split.
1548
1549 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1550 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1551 int fieldAssignments[numIORanks];
1552 int rankAssignments[numComputeRanks + numIORanks];
1553
1554 // Count the number of fields in this epoch style
1555 for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1556
1557 // Decide how to apportion the i/o ranks among the fields
1558 for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1559 computeIORankAssigments(numComputeRanks, numIORanks, nF,
1560 thisEpochStyle, fieldAssignments);
1561 // Sanity check
1562 for (i=0; i < numIORanks; ++i) {
1563 ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1564 }
1565
1566 // Embed the i/o rank assignments into an array holding
1567 // the assignments for *all* the ranks (i/o and compute).
1568 // Rank assignment of "-1" means "compute".
1569 j = 0;
1570 for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1571 rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1572 fieldAssignments[j++] : -1;
1573 }
1574 // Sanity check. Check the assignment for this rank.
1575 if (isIO == thisRanksType) {
1576 ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1577 } else {
1578 ASSERT(-1 == rankAssignments[parentRank]);
1579 }
1580 ASSERT(j == numIORanks);
1581
1582 if (0 == parentRank) {
1583 printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1584 for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1585 if (rankAssignments[i*numRanksPerNode] >= 0) {
1586 // i/o node
1587 for (j = 0; j < numRanksPerNode; ++j) {
1588 printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1589 }
1590 } else {
1591 // compute node
1592 for (j = 0; j < numRanksPerNode; ++j) {
1593 if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1594 ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1595 }
1596 printf(" #");
1597 for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1598 printf("X");
1599 }
1600 }
1601 printf(" ");
1602 }
1603 printf("\n\n");
1604 }
1605
1606 // Find the lowest rank assigned to computation; use it as
1607 // the "root" for the upcoming intercomm creates.
1608 for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1609 // Sanity check
1610 if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1611
1612
1613 // If this is an I/O rank, split the newIntracomm (which, for
1614 // the i/o ranks, is a communicator among all the i/o ranks)
1615 // into the pieces as assigned.
1616
1617 if (isIO == thisRanksType) {
1618 MPI_Comm fieldIntracomm;
1619 int myField = rankAssignments[parentRank];
1620 ASSERT(myField >= 0);
1621
1622 MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1623 thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1624
1625 // Now create an inter-communicator between the computational
1626 // ranks, and each of the newly split off field communicators.
1627 for (i = 0; i < nF; ++i) {
1628
1629 // Do one field at a time to avoid clashes on parentComm
1630 MPI_Barrier(newIntracomm);
1631
1632 if (myField != i) continue;
1633
1634 // Create the intercomm for this field for this epoch style
1635 MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1636 compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1637
1638 // ... and dup a separate one for tile registrations
1639 MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1640 &(thisEpochStyle[myField].registrationIntercomm));
1641 }
1642 }
1643 else {
1644 // This is a computational rank; create the intercommunicators
1645 // to the various split off separate field communicators.
1646
1647 for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1648
1649 // Find the remote "root" process for this field
1650 int fieldRoot = -1;
1651 while (rankAssignments[++fieldRoot] != fieldIndex);
1652
1653 // Create the intercomm for this field for this epoch style
1654 MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1655 fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1656
1657 // ... and dup a separate one for tile registrations
1658 MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1659 &(thisEpochStyle[fieldIndex].registrationIntercomm));
1660 }
1661 }
1662
1663 } // epoch style loop
1664
1665
1666 // I/O processes split off and start receiving data
1667 // NOTE: the I/O processes DO NOT RETURN from this call
1668 if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1669
1670
1671 // The "compute" processes now return with their new intra-communicator.
1672 *rtnComputeComm = newIntracomm;
1673
1674 // but first, call mpiio initialization routine!
1675 initSizesAndTypes();
1676
1677 return;
1678 }
1679
1680
1681
1682
1683 // "Register" the tile-id(s) that this process will be sending
1684 void
1685 f2(int tileID)
1686 {
1687 int i, epochStyleIndex;
1688
1689 static int count = 0;
1690
1691 // This code assumes that the same tileID will apply to all the
1692 // fields. Note that we use the tileID as a tag, so we require it
1693 // be an int with a legal tag value, but we do *NOT* actually use
1694 // the tag *value* for anything (e.g. it is NOT used as an index
1695 // into an array). It is treated as an opaque handle that is
1696 // passed from the compute task(s) to the compositing routines.
1697 // Those two end-points likely assign meaning to the tileID (i.e.
1698 // use it to identify where the tile belongs within the domain),
1699 // but that is their business.
1700 //
1701 // A tileID of -1 signifies the end of registration.
1702
1703 ASSERT(computeIntracommRank >= 0);
1704
1705 if (tileID >= 0) {
1706 // In this version of the code, in addition to the tileID, we also
1707 // multiplex in the low order bit(s) of the epoch number as an
1708 // error check. So the tile value must be small enough to allow that.
1709 ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1710
1711 // In this version of the code, we do not send the actual tileID's
1712 // to the i/o processes during the registration procedure. We only
1713 // send a count of the number of tiles that we will send.
1714 ++count;
1715 return;
1716 }
1717
1718
1719 // We get here when we are called with a negative tileID, signifying
1720 // the end of registration. We now need to figure out and inform
1721 // *each* i/o process in *each* field just how many tiles we will be
1722 // sending them in *each* epoch style.
1723
1724 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1725 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1726
1727 int curFieldIndex = -1;
1728 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1729 fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1730 int numRemoteRanks, *tileCounts, remainder;
1731 MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1732
1733 tileCounts = alloca(numRemoteRanks * sizeof(int));
1734
1735 memset(tileCounts,0,numRemoteRanks * sizeof(int));
1736
1737 // Distribute the tiles among the i/o ranks.
1738 for (i = 0; i < numRemoteRanks; ++i) {
1739 tileCounts[i] = count / numRemoteRanks;
1740 }
1741 // Deal with any remainder
1742 remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1743 for (i = 0; i < remainder; ++i) {
1744 int target = (computeIntracommRank + i) % numRemoteRanks;
1745 tileCounts[target] += 1;
1746 }
1747
1748 // Communicate these counts to the i/o processes for this
1749 // field. Note that we send a message to each process,
1750 // even if the count is zero.
1751 for (i = 0; i < numRemoteRanks; ++i) {
1752 MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1753 thisField->registrationIntercomm);
1754 }
1755
1756 } // field loop
1757 } // epoch-style loop
1758
1759 }
1760
1761
1762
1763
1764 int currentEpochID = 0;
1765 int currentEpochStyle = 0;
1766
1767 void
1768 beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1769 {
1770 fieldInfoThisEpoch_t *p;
1771
1772 if (newEpochID != (currentEpochID + 1)) {
1773 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1774 currentEpochID, newEpochID);
1775 sleep(1); // Give the message a chance to propagate
1776 abort();
1777 }
1778
1779 ////////////////////////////////////////////////////////////////////////
1780 // Need to be sure the i/o tasks are done processing the previous epoch
1781 // before any compute tasks start sending tiles from a new epoch.
1782
1783 if (0 == computeIntracommRank) {
1784 int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1785
1786 // Wait to get an ack that the i/o tasks are done with the prior epoch.
1787 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1788 globalIntercomm, MPI_STATUS_IGNORE);
1789
1790 // Tell the i/o ranks about the new epoch.
1791 // (Just tell rank 0; it will bcast to the others)
1792 MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1793 }
1794
1795 // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1796 MPI_Barrier(computeIntracomm);
1797
1798
1799 currentEpochID = newEpochID;
1800 currentEpochStyle = epochStyle;
1801
1802 // Reset the tileCount (used by f3)
1803 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1804 p->tileCount = 0;
1805 }
1806 }
1807
1808
1809 void
1810 f3(char dataFieldID, int tileID, int epochID, void *data)
1811 {
1812 int whichField, receiver, tag;
1813
1814 static char priorDataFieldID = '\0';
1815 static int priorEpoch = -1;
1816 static int remoteCommSize;
1817 static fieldInfoThisEpoch_t *p;
1818
1819 int flag=0;
1820
1821 // Check that this global has been set
1822 ASSERT(computeIntracommRank >= 0);
1823
1824
1825 // Figure out some info about this dataFieldID. It is
1826 // likely to be another tile from the same field as last time
1827 // we were called, in which case we can reuse the "static" values
1828 // retained from the prior call. If not, we need to look it up.
1829
1830 if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1831
1832 // It's not the same; we need to look it up.
1833
1834 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1835 if (p->dataFieldID == dataFieldID) break;
1836 }
1837 // Make sure we found a valid field
1838
1839 ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1840
1841 MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1842
1843 flag=1;
1844
1845 }
1846
1847 ASSERT(p->dataFieldID == dataFieldID);
1848
1849 receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1850
1851 tag = (tileID << numCheckBits) | (epochID & bitMask);
1852
1853 //fprintf(stderr,"%d %d\n",flag,tileID);
1854
1855 /*
1856 if (tileID==189){
1857 int i,j;
1858 for (i=0;i<TILE_Y;++i){
1859 for (j=0;j<TILE_X;++j)
1860 fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1861 fprintf(stderr,"\n");
1862 }
1863 }
1864
1865 */
1866
1867
1868
1869 FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1870 computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1871
1872 MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1873 MPI_BYTE, receiver, tag, p->dataIntercomm);
1874
1875 p->tileCount += 1;
1876 priorDataFieldID = dataFieldID;
1877 priorEpoch = epochID;
1878 }
1879
1880
1881
1882 void
1883 f4(int epochID)
1884 {
1885 int i;
1886 ASSERT(computeIntracommRank >= 0);
1887
1888 if (0 == computeIntracommRank) {
1889 int cmd[3] = { cmd_exit, -1, -1 };
1890
1891 // Recv the ack that the prior i/o epoch is complete
1892 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1893 globalIntercomm, MPI_STATUS_IGNORE);
1894
1895 // Tell the i/o ranks to exit
1896 // Just tell rank 0; it will bcast to the others
1897 MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1898 }
1899
1900 }
1901
1902
1903
1904 void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1905 int *nt, int *nb,
1906 int *tx, int *ty,
1907 int *ox, int *oy)
1908 {
1909
1910
1911 int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1912
1913 int items = sizeof(data)/sizeof(int);
1914
1915
1916 NTILES = *nt; // total number of tiles
1917
1918
1919 TILE_X = *tx;
1920 TILE_Y = *ty;
1921 XGHOSTS = *ox;
1922 YGHOSTS = *oy;
1923 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1924 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1925
1926
1927 int rank,ierr;
1928 MPI_Comm_rank(globalIntercomm,&rank);
1929
1930
1931 if (0==rank)
1932 printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1933
1934
1935 /*
1936 if (0==rank)
1937 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1938 else
1939 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1940 */
1941
1942 if (0==rank)
1943 ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1944 else
1945 ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1946
1947 }
1948
1949 void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1950 {
1951 int rank,ierr;
1952 MPI_Comm_rank(globalIntercomm,&rank);
1953
1954 if (0==rank)
1955 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1956 else
1957 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1958
1959 if (0==rank)
1960 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1961 else
1962 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1963
1964 if (0==rank)
1965 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1966 else
1967 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1968
1969 }
1970
1971
1972
1973
1974
1975 //////////////////////////////////////////////////////////////////////
1976 // Fortran interface
1977
1978 void
1979 bron_f1(
1980 MPI_Fint *ptr_parentComm,
1981 int *ptr_numComputeRanks,
1982 int *ptr_totalNumTiles,
1983 MPI_Fint *rtnComputeComm)
1984 {
1985 // Convert the Fortran handle into a C handle
1986 MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
1987
1988 f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
1989
1990 // Convert the C handle into a Fortran handle
1991 *rtnComputeComm = MPI_Comm_c2f(newComm);
1992 }
1993
1994
1995
1996 void
1997 bron_f2(int *ptr_tileID)
1998 {
1999 f2(*ptr_tileID);
2000 }
2001
2002
2003 void
2004 beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2005 {
2006 beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2007 }
2008
2009
2010 void
2011 bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2012 {
2013 f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2014 }
2015
2016
2017
2018 void
2019 bron_f4(int *ptr_epochID)
2020 {
2021 f4(*ptr_epochID);
2022 }
2023

  ViewVC Help
Powered by ViewVC 1.1.22