/[MITgcm]/MITgcm_contrib/llc_hires/llc_90/code-async/recvTask.c
ViewVC logotype

Contents of /MITgcm_contrib/llc_hires/llc_90/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.2 - (show annotations) (download)
Tue Oct 3 02:33:27 2017 UTC (7 years, 10 months ago) by dimitri
Branch: MAIN
Changes since 1.1: +3 -3 lines
File MIME type: text/plain
This code and instructions were tested to work on pleiades with checkpoint65v
There is something not quite right about checkpoint65v dumpfreq output.
For example, 2D output files are 421080 instead of 421200 long.

1
2 // Code to do the m-on-n fan-in, recompositing, and output, of data
3 // tiles for mitGCM.
4 //
5 // There are aspects of this code that would be simpler in C++, but
6 // we deliberately wrote the code in ansi-C to make linking it with
7 // the Fortran main code easier (should be able to just include the
8 // .o on the link line).
9
10
11 #include "PACKAGES_CONFIG.h"
12
13 #include <stdio.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20
21 #include <mpi.h>
22 #include <alloca.h>
23
24
25 #include <stdint.h>
26 #include <errno.h>
27
28 #define DEBUG 1
29
30 #if (DEBUG >= 2)
31 #define FPRINTF fprintf
32 #else
33 #include <stdarg.h>
34 void FPRINTF(FILE *fp,...){return;}
35 #endif
36
37
38 // Define our own version of "assert", where we sleep rather than abort.
39 // This makes it easier to attach the debugger.
40 #if (DEBUG >= 1)
41 #define ASSERT(_expr) \
42 if (!(_expr)) { \
43 fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44 getpid(), #_expr, __FILE__, __LINE__, __func__); \
45 sleep(9999); \
46 }
47 #else
48 #define ASSERT assert
49 #endif
50
51
52
53 // If numRanksPerNode is set to be > 0, we just use that setting.
54 // If it is <= 0, we dynamically determine the number of cores on
55 // a node, and use that. (N.B.: we assume *all* nodes being used in
56 // the run have the *same* number of cores.)
57 int numRanksPerNode = 0;
58
59 // Just an error check; can be zero (if you're confident it's all correct).
60 #define numCheckBits 2
61 // This bitMask definition works for 2's complement
62 #define bitMask ((1 << numCheckBits) - 1)
63
64
65 ///////////////////////////////////////////////////////////////////////
66 // Info about the data fields
67
68 // double for now. Might also be float someday
69 #define datumType double
70 #define datumSize (sizeof(datumType))
71
72
73 // Info about the data fields. We assume that all the fields have the
74 // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75 //
76 // the following 3 values are required here. Could be sent over or read in
77 // with some rearrangement in init routines
78 //
79 #define NUM_X 90
80 #define NUM_Y 1170L // get rid of this someday
81 #define NUM_Z 50
82 #define MULTDIM 7
83 #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84 #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85 #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86
87 // Info about the data tiles. We assume that all the tiles are the same
88 // size (no odd-sized last piece), they all have the same X and Y
89 // characteristics (including ghosting), and are full depth in Z
90 // (either 1, or NUM_Z, or MULTDIM, as appropriate).
91 //
92 // all these data now sent over from compute ranks
93 //
94 int TILE_X = -1;
95 int TILE_Y = -1;
96 int XGHOSTS = -1;
97 int YGHOSTS = -1;
98
99 // Size of one Z level of a tile (NOT one Z level of a whole field)
100 int tileOneZLevelItemCount = -1;
101 int tileOneZLevelSizeInBytes = -1;
102
103
104 typedef struct dataFieldDepth {
105 char dataFieldID;
106 int numZ;
107 } dataFieldDepth_t;
108
109 dataFieldDepth_t fieldDepths[] = {
110 { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
111
112 { 'B', 1 },
113 { 'C', 1 },
114 { 'D', 1 },
115 { 'E', 1 },
116 { 'F', 1 },
117 { 'G', 1 },
118 { 'H', 1 },
119 { 'I', 1 },
120 { 'J', 1 },
121 { 'K', 1 },
122 { 'L', 1 },
123 { 'M', 1 },
124 { 'N', 1 },
125 { 'O', 1 },
126 { 'P', 1 },
127 { 'Q', 1 },
128 { 'R', 1 },
129
130 { 'S', NUM_Z },
131 { 'T', NUM_Z },
132 { 'U', NUM_Z },
133 { 'V', NUM_Z },
134 { 'W', NUM_Z },
135 { 'X', NUM_Z },
136 { 'Y', NUM_Z },
137 };
138 #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
139
140
141
142 ///////////////////////////////////////////////////////////////////////
143 // Info about the various kinds of i/o epochs
144 typedef struct epochFieldInfo {
145 char dataFieldID;
146 MPI_Comm registrationIntercomm;
147 MPI_Comm dataIntercomm;
148 MPI_Comm ioRanksIntracomm;
149 int tileCount;
150 int zDepth; // duplicates the fieldDepth entry; filled in automatically
151 char filenameTemplate[128];
152 long int offset;
153 int pickup;
154 } fieldInfoThisEpoch_t;
155
156 // The normal i/o dump
157 fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
158 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
159 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
160 { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "W.%010d.%s", 0, 0 },
161 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Salt.%010d.%s", 0, 0 },
162 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Theta.%010d.%s", 0,0 },
163 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
164
165 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIarea.%010d.%s", 0,0 },
166 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIheff.%010d.%s", 0,0 },
167 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsnow.%010d.%s", 0,0 },
168 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIuice.%010d.%s", 0,0 },
169 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIvice.%010d.%s", 0,0 },
170 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsalt.%010d.%s", 0,0 },
171
172 //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "EtaHnm1.%010d.%s", 0,0 },
173 { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUX.%010d.%s", 0,0 },
174 { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUY.%010d.%s", 0,0 },
175 { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "KPPhbl.%010d.%s", 0,0 },
176 { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceSflux.%010d.%s", 0,0 },
177
178 { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceFWflx.%010d.%s", 0,0 },
179 { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQnet.%010d.%s", 0,0 },
180 { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "PhiBot.%010d.%s", 0,0 },
181 { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQsw.%010d.%s", 0,0 },
182 //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "dEtaHdt.%010d.%s", 0,0 },
183
184 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
185 };
186
187
188 // pickup file
189 fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
190 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
191 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
192 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
193 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
194 { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
195 { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
196 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
197 { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
198 { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
199 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
200 };
201
202
203 // seaice pickup
204 fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
205 { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
206 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
207 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
208 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
209 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
210 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
211 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
212 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
213 };
214
215
216 fieldInfoThisEpoch_t *epochStyles[] = {
217 fieldsForEpochStyle_0,
218 fieldsForEpochStyle_1,
219 fieldsForEpochStyle_2,
220 };
221 int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
222
223
224 typedef enum {
225 cmd_illegal,
226 cmd_newEpoch,
227 cmd_epochComplete,
228 cmd_exit,
229 } epochCmd_t;
230
231
232 ///////////////////////////////////////////////////////////////////////
233
234 // Note that a rank will only have access to one of the Intracomms,
235 // but all ranks will define the Intercomm.
236 MPI_Comm ioIntracomm = MPI_COMM_NULL;
237 int ioIntracommRank = -1;
238 MPI_Comm computeIntracomm = MPI_COMM_NULL;
239 int computeIntracommRank = -1;
240 MPI_Comm globalIntercomm = MPI_COMM_NULL;
241
242
243 #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
244 #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
245
246
247
248 typedef enum {
249 bufState_illegal,
250 bufState_Free,
251 bufState_InUse,
252 } bufState_t;
253
254
255 typedef struct buf_header{
256 struct buf_header *next;
257 bufState_t state;
258 int requestsArraySize;
259 MPI_Request *requests;
260 uint64_t ck0;
261 char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
262 uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
263 } bufHdr_t; // allocateTileBufs
264
265
266 bufHdr_t *freeTileBufs_ptr = NULL;
267 bufHdr_t *inUseTileBufs_ptr = NULL;
268
269 int maxTagValue = -1;
270 int totalNumTiles = -1;
271
272 // routine to convert double to float during memcpy
273 // need to get byteswapping in here as well
274 memcpy_r8_2_r4 (float *f, double *d, long long *n)
275 {
276 long long i, rem;
277 rem = *n%16LL;
278 for (i = 0; i < rem; i++) {
279 f [i] = d [i];
280 }
281 for (i = rem; i < *n; i += 16) {
282 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
283 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
284 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
285 __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
286 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
287 __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
288 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
289 __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
290 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
291 __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
292 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
293 __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
294 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
295 __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
296 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
297 __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
298 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
299 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
300 __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
301 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
302 __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
303 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
304 __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
305 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
306 __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
307 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
308 __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
309 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
310 __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
311 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
312 __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
313 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
314 __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
315 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
316 __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
317 }
318 }
319
320 // Debug routine
321 countBufs(int nbufs)
322 {
323 int nInUse, nFree;
324 bufHdr_t *bufPtr;
325 static int target = -1;
326
327 if (-1 == target) {
328 ASSERT(-1 != nbufs);
329 target = nbufs;
330 }
331
332 bufPtr = freeTileBufs_ptr;
333 for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
334
335 bufPtr = inUseTileBufs_ptr;
336 for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
337
338 if (nInUse + nFree != target) {
339 fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
340 ioIntracommRank, nFree, nInUse, target);
341 sleep(5000);
342 }
343 }
344
345 int readn(int fd, void *p, int nbytes)
346 {
347
348 char *ptr = (char*)(p);
349
350 int nleft, nread;
351
352 nleft = nbytes;
353 while (nleft > 0){
354 nread = read(fd, ptr, nleft);
355 if (nread < 0)
356 return(nread); // error
357 else if (nread == 0)
358 break; // EOF
359
360 nleft -= nread;
361 ptr += nread;
362 }
363 return (nbytes - nleft);
364 }
365
366
367 ssize_t writen(int fd, void *p, size_t nbytes)
368 {
369 char *ptr = (char*)(p);
370
371 size_t nleft;
372 ssize_t nwritten;
373
374 nleft = nbytes;
375 while (nleft > 0){
376 nwritten = write(fd, ptr, nleft);
377 if (nwritten <= 0){
378 if (errno==EINTR) continue; // POSIX, not SVr4
379 return(nwritten); // non-EINTR error
380 }
381
382 nleft -= nwritten;
383 ptr += nwritten;
384 }
385 return(nbytes - nleft);
386 }
387
388
389 void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
390 {
391 int i;
392 int ndims = 2;
393 int nrecords,nfields;
394 char**f;
395 char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
396 char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
397 // for now, just list the fields here. When the whole field specification apparatus
398 // is cleaned up, pull the names out of the epochstyle definition or whatever
399
400 ASSERT(1==pickup||2==pickup);
401
402 if (1==pickup){
403 nrecords = 6*NUM_Z+3;
404 nfields = sizeof(fld1)/sizeof(char*);
405 f = fld1;
406 }
407 else if (2==pickup){
408 nrecords = MULTDIM+6;
409 nfields = sizeof(fld2)/sizeof(char*);
410 f = fld2;
411 }
412
413 fprintf(fp," nDims = [ %3d ];\n",ndims);
414 fprintf(fp," dimList = [\n");
415 fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
416 fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
417 fprintf(fp," ];\n");
418 fprintf(fp," dataprec = [ 'float64' ];\n");
419 fprintf(fp," nrecords = [ %5d ];\n",nrecords);
420 fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
421 fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
422 fprintf(fp," nFlds = [ %4d ];\n",nfields);
423 fprintf(fp," fldList = {\n");
424 for (i=0;i<nfields;++i)
425 fprintf(fp," '%-8s'",f[i]);
426 fprintf(fp,"\n };\n");
427 }
428
429
430
431 double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
432 size_t outBufSize=0;
433
434
435 void
436 do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
437 {
438 if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
439
440 int pickup = whichField->pickup;
441
442 ///////////////////////////////
443 // swap here, if necessary
444 // int i,j;
445
446 //i = NUM_X*NUM_Y*numZ;
447 //mds_byteswapr8_(&i,outBuf);
448
449 // mds_byteswapr8 expects an integer count, which is gonna overflow someday
450 // can't redefine to long without affecting a bunch of other calls
451 // so do a slab at a time here, to delay the inevitable
452 // i = NUM_X*NUM_Y;
453 //for (j=0;j<numZ;++j)
454 // mds_byteswapr8_(&i,&outBuf[i*j]);
455
456 // gnu builtin evidently honored by intel compilers
457
458 if (pickup) {
459 uint64_t *alias = (uint64_t*)outBuf;
460 size_t i;
461 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
462 alias[i] = __builtin_bswap64(alias[i]);
463 }
464 else {
465 uint32_t *alias = (uint32_t*)outBuf;
466 size_t i;
467 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
468 alias[i] = __builtin_bswap32(alias[i]);
469 }
470
471 // end of swappiness
472 //////////////////////////////////
473
474 char s[1024];
475 //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
476
477 sprintf(s,whichField->filenameTemplate,gcmIter,"data");
478
479 int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
480 ASSERT(fd!=-1);
481
482 size_t nbytes;
483
484 if (pickup) {
485 lseek(fd,whichField->offset,SEEK_SET);
486 lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
487 nbytes = NUM_X*NUM_Y*numZ*datumSize;
488 }
489 else {
490 lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
491 nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
492 }
493
494 ssize_t bwrit = writen(fd,outBuf,nbytes);
495
496 if (-1==bwrit) perror("Henze : file write problem : ");
497
498 FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
499
500 // ASSERT(nbytes == bwrit);
501
502 if (nbytes!=bwrit)
503 fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
504
505 close(fd);
506
507
508 return;
509 }
510
511
512 int NTILES = -1;
513
514 typedef struct {
515 int off;
516 int skip;
517 } tile_layout_t;
518
519 tile_layout_t *offsetTable;
520
521 void
522 processSlabSection(
523 fieldInfoThisEpoch_t *whichField,
524 int tileID,
525 void *data,
526 int myNumZSlabs)
527 {
528 int intracommSize,intracommRank;
529 // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
530 MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
531 //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
532 // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
533 // offsetTable[tileID].skip,myNumZSlabs);
534
535 int z;
536
537 int pickup = whichField->pickup;
538
539 //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
540 ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
541
542 if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
543
544 free(outBuf);
545
546 outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
547
548 outBuf = malloc(outBufSize);
549 ASSERT(outBuf);
550
551 memset(outBuf,0,outBufSize);
552 }
553
554 for (z=0;z<myNumZSlabs;++z){
555
556 off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
557 off_t hoffset = offsetTable[tileID].off;
558 off_t skipdst = offsetTable[tileID].skip;
559
560 //double *dst = outBuf + zoffset + hoffset;
561 void *dst;
562 if (pickup)
563 dst = outBuf + zoffset + hoffset;
564 else
565 dst = (float*)outBuf + zoffset + hoffset;
566
567 off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
568 off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
569 double *src = (double*)data + zoff + hoff;
570
571 off_t skipsrc = TILE_X+2*XGHOSTS;
572
573 //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
574
575 long long n = TILE_X;
576
577 int y;
578 if (pickup)
579 for (y=0;y<TILE_Y;++y)
580 memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
581 else
582 for (y=0;y<TILE_Y;++y)
583 memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
584 }
585
586 return;
587 }
588
589
590
591 // Allocate some buffers to receive tile-data from the compute ranks
592 void
593 allocateTileBufs(int numTileBufs, int maxIntracommSize)
594 {
595
596 ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
597
598 int i, j;
599 for (i = 0; i < numTileBufs; ++i) {
600
601 bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
602 ASSERT(NULL != newBuf);
603
604 newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
605 ASSERT(NULL != newBuf->payload);
606
607 newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
608 ASSERT(NULL != newBuf->requests);
609
610 // Init some values
611 newBuf->requestsArraySize = maxIntracommSize;
612 for (j = 0; j < maxIntracommSize; ++j) {
613 newBuf->requests[j] = MPI_REQUEST_NULL;
614 }
615 newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
616
617 // Put the buf into the free list
618 newBuf->state = bufState_Free;
619 newBuf->next = freeTileBufs_ptr;
620 freeTileBufs_ptr = newBuf;
621 }
622 }
623
624
625
626 bufHdr_t *
627 getFreeBuf()
628 {
629 int j;
630 bufHdr_t *rtnValue = freeTileBufs_ptr;
631
632 if (NULL != rtnValue) {
633 ASSERT(bufState_Free == rtnValue->state);
634 freeTileBufs_ptr = rtnValue->next;
635 rtnValue->next = NULL;
636
637 // Paranoia. This should already be the case
638 for (j = 0; j < rtnValue->requestsArraySize; ++j) {
639 rtnValue->requests[j] = MPI_REQUEST_NULL;
640 }
641 }
642 return rtnValue;
643 }
644
645
646 /////////////////////////////////////////////////////////////////
647
648
649 bufHdr_t *
650 tryToReceiveDataTile(
651 MPI_Comm dataIntercomm,
652 int epochID,
653 size_t expectedMsgSize,
654 int *tileID)
655 {
656 bufHdr_t *bufHdr;
657 int pending, i, count;
658 MPI_Status mpiStatus;
659
660 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
661 &pending, &mpiStatus);
662
663 // If no data are pending, or if we can't get a buffer to
664 // put the pending data into, return NULL
665 if (!pending) return NULL;
666 if (((bufHdr = getFreeBuf()) == NULL)) {
667 FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
668 mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
669 return NULL;
670 }
671
672 // Do sanity checks on the pending message
673 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
674 ASSERT(count == expectedMsgSize);
675 ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
676
677 // Recieve the data we saw in the iprobe
678 MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
679 mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
680 dataIntercomm, &mpiStatus);
681 bufHdr->state = bufState_InUse;
682
683 // Overrun check
684 ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
685 ASSERT(bufHdr->ck0==bufHdr->ck1);
686
687 // Return values
688 *tileID = mpiStatus.MPI_TAG >> numCheckBits;
689
690
691 FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
692 mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
693
694 return bufHdr;
695 }
696
697
698
699 int
700 tryToReceiveZSlab(
701 void *buf,
702 int expectedMsgSize,
703 MPI_Comm intracomm)
704 {
705 MPI_Status mpiStatus;
706 int pending, count;
707
708 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
709 if (!pending) return -1;
710
711 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
712 ASSERT(count == expectedMsgSize);
713
714 MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
715 mpiStatus.MPI_TAG, intracomm, &mpiStatus);
716
717 FPRINTF(stderr, "recv slab %d from rank %d\n",
718 mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
719
720 // return the tileID
721 return mpiStatus.MPI_TAG;
722 }
723
724
725
726 void
727 redistributeZSlabs(
728 bufHdr_t *bufHdr,
729 int tileID,
730 int zSlabsPer,
731 int thisFieldNumZ,
732 MPI_Comm intracomm)
733 {
734 int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
735 int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
736 int offset = 0;
737 int recvRank = 0;
738
739 while ((offset + pieceSize) <= tileSizeInBytes) {
740
741 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
742 tileID, intracomm, &(bufHdr->requests[recvRank]));
743 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
744
745 offset += pieceSize;
746 recvRank += 1;
747 }
748
749 // There might be one last odd-sized piece
750 if (offset < tileSizeInBytes) {
751 pieceSize = tileSizeInBytes - offset;
752 ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
753
754 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
755 tileID, intracomm, &(bufHdr->requests[recvRank]));
756 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
757
758 offset += pieceSize;
759 recvRank += 1;
760 }
761
762 // Sanity check
763 ASSERT(recvRank <= bufHdr->requestsArraySize);
764 while (recvRank < bufHdr->requestsArraySize) {
765 ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
766 }
767 }
768
769
770
771 /////////////////////////////////////////////////////////////////
772 // This is called by the i/o ranks
773 void
774 doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
775 {
776 // In an i/o epoch, the i/o ranks are partitioned into groups,
777 // each group dealing with one field. The ranks within a group:
778 // (1) receive data tiles from the compute ranks
779 // (2) slice the received data tiles into slabs in the 'z'
780 // dimension, and redistribute the tile-slabs among the group.
781 // (3) receive redistributed tile-slabs, and reconstruct a complete
782 // field-slab for the whole field.
783 // (4) Write the completed field-slab to disk.
784
785 fieldInfoThisEpoch_t *fieldInfo;
786 int intracommRank, intracommSize;
787
788 int zSlabsPer;
789 int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
790
791 int numTilesRecvd, numSlabPiecesRecvd;
792
793
794 // Find the descriptor for my assigned field for this epoch style.
795 // It is the one whose dataIntercomm is not null
796 fieldInfo = epochStyles[epochStyleIndex];
797 while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
798 ASSERT('\0' != fieldInfo->dataFieldID);
799
800
801
802 MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
803 MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
804
805
806 // Compute which z slab(s) we will be reassembling.
807 zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
808 myNumZSlabs = zSlabsPer;
809
810 // Adjust myNumZSlabs in case it didn't divide evenly
811 myFirstZSlab = intracommRank * myNumZSlabs;
812 if (myFirstZSlab >= fieldInfo->zDepth) {
813 myNumZSlabs = 0;
814 } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
815 myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
816 } else {
817 myNumZSlabs = zSlabsPer;
818 }
819 myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
820
821 // If we were not assigned any z-slabs, we don't get any redistributed
822 // tile-slabs. If we were assigned one or more slabs, we get
823 // redistributed tile-slabs for every tile.
824 myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
825
826
827 numTilesRecvd = 0;
828 numSlabPiecesRecvd = 0;
829
830 ////////////////////////////////////////////////////////////////////
831 // Main loop. Handle tiles from the current epoch
832 for(;;) {
833 bufHdr_t *bufHdr;
834
835 ////////////////////////////////////////////////////////////////
836 // Check for tiles from the computational tasks
837 while (numTilesRecvd < fieldInfo->tileCount) {
838 int tileID = -1;
839 size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
840 bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
841 msgSize, &tileID);
842 if (NULL == bufHdr) break; // No tile was received
843
844 numTilesRecvd += 1;
845 redistributeZSlabs(bufHdr, tileID, zSlabsPer,
846 fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
847
848 // Add the bufHdr to the "in use" list
849 bufHdr->next = inUseTileBufs_ptr;
850 inUseTileBufs_ptr = bufHdr;
851
852
853 }
854
855
856 ////////////////////////////////////////////////////////////////
857 // Check for tile-slabs redistributed from the i/o processes
858 while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
859 int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
860 char data[msgSize];
861
862 int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
863 if (tileID < 0) break; // No slab was received
864
865
866 numSlabPiecesRecvd += 1;
867 processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
868
869
870 // Can do the write here, or at the end of the epoch.
871 // Probably want to make it asynchronous (waiting for
872 // completion at the barrier at the start of the next epoch).
873 //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
874 // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
875 // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
876 //}
877 }
878
879
880 ////////////////////////////////////////////////////////////////
881 // Check if we can release any of the inUse buffers (i.e. the
882 // isends we used to redistribute those z-slabs have completed).
883 // We do this by detaching the inUse list, then examining each
884 // element in the list, putting each element either into the
885 // free list or back into the inUse list, as appropriate.
886 bufHdr = inUseTileBufs_ptr;
887 inUseTileBufs_ptr = NULL;
888 while (NULL != bufHdr) {
889 int count = 0;
890 int completions[intracommSize];
891 MPI_Status statuses[intracommSize];
892
893 // Acquire the "next" bufHdr now, before we change anything.
894 bufHdr_t *nextBufHeader = bufHdr->next;
895
896 // Check to see if the Isend requests have completed for this buf
897 MPI_Testsome(intracommSize, bufHdr->requests,
898 &count, completions, statuses);
899
900 if (MPI_UNDEFINED == count) {
901 // A return of UNDEFINED means that none of the requests
902 // is still outstanding, i.e. they have all completed.
903 // Put the buf on the free list.
904 bufHdr->state = bufState_Free;
905 bufHdr->next = freeTileBufs_ptr;
906 freeTileBufs_ptr = bufHdr;
907 FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
908 } else {
909 // At least one request is still outstanding.
910 // Put the buf on the inUse list.
911 bufHdr->next = inUseTileBufs_ptr;
912 inUseTileBufs_ptr = bufHdr;
913 }
914
915 // Countinue with the next buffer
916 bufHdr = nextBufHeader;
917 }
918
919
920 ////////////////////////////////////////////////////////////////
921 // Check if the epoch is complete
922 if ((numTilesRecvd >= fieldInfo->tileCount) &&
923 (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
924 (NULL == inUseTileBufs_ptr))
925 {
926 ASSERT(numTilesRecvd == fieldInfo->tileCount);
927 ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
928
929 //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
930
931 // Ok, wrap up the current i/o epoch
932 // Probably want to make this asynchronous (waiting for
933 // completion at the start of the next epoch).
934 do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
935
936 // The epoch is complete
937 return;
938 }
939 }
940
941 }
942
943
944
945
946
947
948
949 // Main routine for the i/o tasks. Callers DO NOT RETURN from
950 // this routine; they MPI_Finalize and exit.
951 void
952 ioRankMain(int totalNumTiles)
953 {
954 // This is one of a group of i/o processes that will be receiving tiles
955 // from the computational processes. This same process is also
956 // responsible for compositing slices of tiles together into one or
957 // more complete slabs, and then writing those slabs out to disk.
958 //
959 // When a tile is received, it is cut up into slices, and each slice is
960 // sent to the appropriate compositing task that is dealing with those
961 // "z" level(s) (possibly including ourselves). These are tagged with
962 // the tile-id. When we *receive* such a message (from ourselves, or
963 // from any other process in this group), we unpack the data (stripping
964 // the ghost cells), and put them into the slab we are assembling. Once
965 // a slab is fully assembled, it is written to disk.
966
967 int i, size, count, numTileBufs;
968 MPI_Status status;
969 int currentEpochID = 0;
970 int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
971
972 int numComputeRanks, epochStyleIndex;
973 MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
974
975
976 ////////////////////////////////////////////////////////////
977 //// get global tile info via bcast over the global intercom
978
979
980 // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
981
982 int data[9];
983
984 int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
985
986 NTILES = data[3];
987 TILE_X = data[5];
988 TILE_Y = data[6];
989 XGHOSTS = data[7];
990 YGHOSTS = data[8];
991
992 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
993 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
994
995 int *xoff = malloc(NTILES*sizeof(int));
996 ASSERT(xoff);
997 int *yoff = malloc(NTILES*sizeof(int));
998 ASSERT(yoff);
999 int *xskip = malloc(NTILES*sizeof(int));
1000 ASSERT(xskip);
1001
1002 offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
1003 ASSERT(offsetTable);
1004
1005 ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
1006 ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
1007 ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
1008
1009 for (i=0;i<NTILES;++i){
1010 offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1011 offsetTable[i+1].skip = xskip[i];
1012 }
1013
1014 free(xoff);
1015 free(yoff);
1016 free(xskip);
1017
1018 ///////////////////////////////////////////////////////////////
1019
1020
1021 // At this point, the multitude of various inter-communicators have
1022 // been created and stored in the various epoch "style" arrays.
1023 // The next step is to have the computational tasks "register" the
1024 // data tiles they will send. In this version of the code, we only
1025 // track and store *counts*, not the actual tileIDs.
1026 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1027 fieldInfoThisEpoch_t *p = NULL;
1028 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1029 int thisIntracommSize;
1030
1031 // Find the field we were assigned for this epoch style
1032 int curFieldIndex = -1;
1033 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1034 p = &(thisEpochStyle[curFieldIndex]);
1035 if (MPI_COMM_NULL != p->registrationIntercomm) break;
1036 }
1037 ASSERT(NULL != p);
1038 ASSERT('\0' != p->dataFieldID);
1039 MPI_Comm_size(p->ioRanksIntracomm, &size);
1040 if (size > maxIntracommSize) maxIntracommSize = size;
1041
1042
1043 // Receive a message from *each* computational rank telling us
1044 // a count of how many tiles that rank will send during epochs
1045 // of this style. Note that some of these counts may be zero
1046 for (i = 0; i < numComputeRanks; ++i) {
1047 MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1048 p->registrationIntercomm, &status);
1049 p->tileCount += status.MPI_TAG;
1050 }
1051 if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1052 if (p->zDepth > 1) {
1053 if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1054 }
1055
1056 // Sanity check
1057 MPI_Allreduce (&(p->tileCount), &count, 1,
1058 MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1059 ASSERT(count == totalNumTiles);
1060 }
1061
1062
1063 // In some as-of-yet-undetermined fashion, we decide how many
1064 // buffers to allocate to hold tiles received from the computational
1065 // tasks. The number of such buffers is more-or-less arbitrary (the
1066 // code should be able to function with any number greater than zero).
1067 // More buffers means more parallelism, but uses more space.
1068 // Note that maxTileCount is an upper bound on the number of buffers
1069 // that we can usefully use. Note also that if we are assigned a 2D
1070 // field, we might be assigned to recieve a very large number of tiles
1071 // for that field in that epoch style.
1072
1073 // Hack - for now, just pick a value for numTileBufs
1074 numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1075 if (numTileBufs < 4) numTileBufs = 4;
1076 if (numTileBufs > 15) numTileBufs = 15;
1077 // if (numTileBufs > 25) numTileBufs = 25;
1078
1079 allocateTileBufs(numTileBufs, maxIntracommSize);
1080 countBufs(numTileBufs);
1081
1082
1083 ////////////////////////////////////////////////////////////////////
1084 // Main loop.
1085 ////////////////////////////////////////////////////////////////////
1086
1087 for (;;) {
1088 int cmd[4];
1089
1090 // Make sure all the i/o ranks have completed processing the prior
1091 // cmd (i.e. the prior i/o epoch is complete).
1092 MPI_Barrier(ioIntracomm);
1093
1094 if (0 == ioIntracommRank) {
1095 fprintf(stderr, "I/O ranks waiting for new epoch\n");
1096 MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1097
1098 MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1099 fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1100
1101 // before we start a new epoch, have i/o rank 0:
1102 // determine output filenames for this epoch
1103 // clean up any extant files with same names
1104 // write .meta files
1105
1106 if (cmd_exit != cmd[0]){
1107
1108 fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1109
1110 int epochStyle = cmd[2];
1111 int gcmIter = cmd[3];
1112
1113 fieldInfoThisEpoch_t *fieldInfo;
1114 fieldInfo = epochStyles[epochStyle];
1115 char s[1024];
1116 int res;
1117 FILE *fp;
1118
1119 if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1120 char f;
1121 while (f = fieldInfo->dataFieldID){
1122 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1123 fprintf(stderr,"%s\n",s);
1124 res = unlink(s);
1125 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1126
1127 // skip writing meta files for non-pickup fields
1128 /*
1129 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1130 fp = fopen(s,"w+");
1131 fclose(fp);
1132 */
1133
1134 ++fieldInfo;
1135
1136 }
1137 }
1138
1139 else { // single pickup or pickup_seaice file
1140
1141 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1142 fprintf(stderr,"%s\n",s);
1143 res = unlink(s);
1144 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1145
1146 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1147 fp = fopen(s,"w+");
1148 write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1149 fclose(fp);
1150
1151 }
1152 }
1153 }
1154 MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1155
1156 switch (cmd[0]) {
1157
1158 case cmd_exit:
1159 // Don't bother trying to disconnect and free the
1160 // plethora of communicators; just exit.
1161 FPRINTF(stderr,"Received shutdown message\n");
1162 MPI_Finalize();
1163 exit(0);
1164 break;
1165
1166 case cmd_newEpoch:
1167 if ((currentEpochID + 1) != cmd[1]) {
1168 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1169 "but is now %d ??\n", currentEpochID, cmd[1]);
1170 sleep(1); // Give the message a chance to propagate
1171 abort();
1172 }
1173 currentEpochID = cmd[1];
1174
1175 memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1176
1177 doNewEpoch(cmd[1], cmd[2], cmd[3]);
1178 break;
1179
1180 default:
1181 fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1182 cmd[0], cmd[1], cmd[2], cmd[3]);
1183 sleep(1);
1184 abort();
1185 break;
1186 }
1187
1188 }
1189
1190 }
1191
1192
1193
1194 ///////////////////////////////////////////////////////////////////////////////////
1195
1196 int
1197 findField(const char c)
1198 {
1199 int i;
1200 for (i = 0; i < numAllFields; ++i) {
1201 if (c == fieldDepths[i].dataFieldID) return i;
1202 }
1203
1204 // Give the error message a chance to propagate before exiting.
1205 fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1206 sleep(1);
1207 abort();
1208 }
1209
1210
1211
1212 // Given a number of ranks and a set of fields we will want to output,
1213 // figure out how to distribute the ranks among the fields.
1214 void
1215 computeIORankAssigments(
1216 int numComputeRanks,
1217 int numIORanks,
1218 int numFields,
1219 fieldInfoThisEpoch_t *fields,
1220 int assignments[])
1221 {
1222 int i,j,k, sum, count;
1223
1224 int numIONodes = numIORanks / numRanksPerNode;
1225 int numIORanksThisField[numFields];
1226 long int bytesPerIORankThisField[numFields];
1227 int expectedMessagesPerRankThisField[numFields];
1228 long int fieldSizes[numFields];
1229
1230 struct ioNodeInfo_t {
1231 int expectedNumMessagesThisNode;
1232 int numCoresAssigned;
1233 int *assignedFieldThisCore;
1234 } ioNodeInfo[numIONodes];
1235
1236 // Since numRanksPerNode might be dynamically determined, we have
1237 // to dynamically allocate the assignedFieldThisCore arrays.
1238 for (i = 0; i < numIONodes; ++i) {
1239 ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1240 ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1241 }
1242
1243 ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1244
1245
1246 //////////////////////////////////////////////////////////////
1247 // Compute the size for each field in this epoch style
1248 for (i = 0; i < numFields; ++i) {
1249 ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1250 fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1251 }
1252
1253
1254 /////////////////////////////////////////////////////////
1255 // Distribute the available i/o ranks among the fields,
1256 // proportionally based on field size.
1257
1258 // First, assign one rank per field
1259 ASSERT(numIORanks >= numFields);
1260 for (i = 0; i < numFields; ++i) {
1261 numIORanksThisField[i] = 1;
1262 bytesPerIORankThisField[i] = fieldSizes[i];
1263 }
1264
1265 // Now apportion any extra ranks
1266 for (; i < numIORanks; ++i) {
1267
1268 // Find the field 'k' with the most bytesPerIORank
1269 k = 0;
1270 for (j = 1; j < numFields; ++j) {
1271 if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1272 k = j;
1273 }
1274 }
1275
1276 // Assign an i/o rank to that field
1277 numIORanksThisField[k] += 1;
1278 bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1279 }
1280
1281 ////////////////////////////////////////////////////////////
1282 // At this point, all the i/o ranks should be apportioned
1283 // among the fields. Check we didn't screw up the count.
1284 for (sum = 0, i = 0; i < numFields; ++i) {
1285 sum += numIORanksThisField[i];
1286 }
1287 ASSERT(sum == numIORanks);
1288
1289
1290
1291 //////////////////////////////////////////////////////////////////
1292 // The *number* of i/o ranks assigned to a field is based on the
1293 // field size. But the *placement* of those ranks is based on
1294 // the expected number of messages the process will receive.
1295 // [In particular, if we have several fields each with only one
1296 // assigned i/o rank, we do not want to place them all on the
1297 // same node if we don't have to.] This traffic-load balance
1298 // strategy is an approximation at best since the messages may
1299 // not all be the same size (e.g. 2D fields vs. 3D fields), so
1300 // "number of messages" is not the same thing as "traffic".
1301 // But it should suffice.
1302
1303 // Init a couple of things
1304 for (i = 0; i < numFields; ++i) {
1305 expectedMessagesPerRankThisField[i] =
1306 numComputeRanks / numIORanksThisField[i];
1307 }
1308 for (i = 0; i < numIONodes; ++i) {
1309 ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1310 ioNodeInfo[i].numCoresAssigned = 0;
1311 for (j = 0; j < numRanksPerNode; ++j) {
1312 ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1313 }
1314 }
1315
1316 ///////////////////////////////////////////////////////////////////
1317 // Select the i/o node with the smallest expectedNumMessages, and
1318 // assign it a rank from the field with the highest remaining
1319 // expectedMessagesPerRank. Repeat until everything is assigned.
1320 // (Yes, this could be a lot faster, but isn't worth the trouble.)
1321
1322 for (count = 0; count < numIORanks; ++count) {
1323
1324 // Make an initial choice of node
1325 for (i = 0; i < numIONodes; ++i) {
1326 if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1327 }
1328 j = i;
1329 ASSERT(j < numIONodes);
1330
1331 // Search for a better choice
1332 for (++i; i < numIONodes; ++i) {
1333 if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1334 if (ioNodeInfo[i].expectedNumMessagesThisNode <
1335 ioNodeInfo[j].expectedNumMessagesThisNode)
1336 {
1337 j = i;
1338 }
1339 }
1340
1341
1342 // Make an initial choice of field
1343 for (i = 0; i < numFields; ++i) {
1344 if (numIORanksThisField[i] > 0) break;
1345 }
1346 k = i;
1347 ASSERT(k < numFields);
1348
1349 // Search for a better choice
1350 for (++i; i < numFields; ++i) {
1351 if (numIORanksThisField[i] <= 0) continue;
1352 if (expectedMessagesPerRankThisField[i] >
1353 expectedMessagesPerRankThisField[k])
1354 {
1355 k = i;
1356 }
1357 }
1358
1359 // Put one rank from the chosen field onto the chosen node
1360 ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1361 ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1362 ioNodeInfo[j].numCoresAssigned += 1;
1363 numIORanksThisField[k] -= 1;
1364 }
1365
1366 // Sanity check - all ranks were assigned to a core
1367 for (i = 1; i < numFields; ++i) {
1368 ASSERT(0 == numIORanksThisField[i]);
1369 }
1370 // Sanity check - all cores were assigned a rank
1371 for (i = 1; i < numIONodes; ++i) {
1372 ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1373 }
1374
1375
1376 /////////////////////////////////////
1377 // Return the computed assignments
1378 for (i = 0; i < numIONodes; ++i) {
1379 for (j = 0; j < numRanksPerNode; ++j) {
1380 assignments[i*numRanksPerNode + j] =
1381 ioNodeInfo[i].assignedFieldThisCore[j];
1382 }
1383 }
1384
1385 // Clean up
1386 for (i = 0; i < numIONodes; ++i) {
1387 free(ioNodeInfo[i].assignedFieldThisCore);
1388 }
1389
1390 }
1391
1392 //////////////////////////////////////////////////////////////////////////////////
1393
1394
1395
1396
1397 int
1398 isIORank(int commRank, int totalNumNodes, int numIONodes)
1399 {
1400 // Figure out if this rank is on a node that will do i/o.
1401 // Note that the i/o nodes are distributed throughout the
1402 // task, not clustered together.
1403 int ioNodeStride = totalNumNodes / numIONodes;
1404 int thisRankNode = commRank / numRanksPerNode;
1405 int n = thisRankNode / ioNodeStride;
1406 return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1407 }
1408
1409
1410 // Find the number of *physical cores* on the current node
1411 // (ignore hyperthreading). This should work for pretty much
1412 // any Linux based system (and fail horribly for anything else).
1413 int
1414 getNumCores(void)
1415 {
1416 return 20; // until we figure out why this cratered
1417
1418 char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1419 "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1420 "grep -v ^$ | sort -u | wc -l";
1421
1422 FILE *fp = popen(magic,"r");
1423 ASSERT(fp);
1424
1425 int rtnValue = -1;
1426
1427 int res = fscanf(fp,"%d",&rtnValue);
1428
1429 ASSERT(1==res);
1430
1431 pclose(fp);
1432
1433 ASSERT(rtnValue > 0);
1434 return rtnValue;
1435 }
1436
1437
1438
1439 ////////////////////////////////////////////////////////////////////////
1440 ////////////////////////////////////////////////////////////////////////
1441 // Routines called by the mitGCM code
1442
1443
1444 ////////////////////////////////////////
1445 // Various "one time" initializations
1446 void
1447 f1(
1448 MPI_Comm parentComm,
1449 int numComputeRanks,
1450 int numTiles,
1451 MPI_Comm *rtnComputeComm)
1452 {
1453 MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1454 int newIntracommRank, thisIsIORank;
1455 int parentSize, parentRank;
1456 int numIONodes, numIORanks;
1457 int mpiIsInitialized, *intPtr, tagUBexists;
1458 int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1459 int totalNumNodes, numComputeNodes, newIntracommSize;
1460
1461 // Init globals
1462 totalNumTiles = numTiles;
1463 if (numRanksPerNode <= 0) {
1464 // Might also want to check for an env var (or something)
1465 numRanksPerNode = getNumCores();
1466 }
1467
1468 // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1469 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1470 fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1471
1472 int curFieldIndex = -1;
1473 while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1474 i = findField(f->dataFieldID);
1475 if (-1 != f->zDepth) {
1476 if (f->zDepth != fieldDepths[i].numZ) {
1477 fprintf(stderr, "Inconsistent z-depth for field '%c': "
1478 "fieldDepths[%d] and epoch style %d\n",
1479 f->dataFieldID, i, epochStyleIndex);
1480 }
1481 }
1482 f->zDepth = fieldDepths[i].numZ;
1483 }
1484 }
1485
1486
1487 // Find the max MPI "tag" value
1488 MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1489 ASSERT(tagUBexists);
1490 maxTagValue = *intPtr;
1491
1492
1493 // Info about the parent communicator
1494 MPI_Initialized(&mpiIsInitialized);
1495 ASSERT(mpiIsInitialized);
1496 MPI_Comm_size(parentComm, &parentSize);
1497 MPI_Comm_rank(parentComm, &parentRank);
1498
1499
1500 // Figure out how many nodes we can use for i/o.
1501 // To make things (e.g. memory calculations) simpler, we want
1502 // a node to have either all compute ranks, or all i/o ranks.
1503 // If numComputeRanks does not evenly divide numRanksPerNode, we have
1504 // to round up in favor of the compute side.
1505
1506 totalNumNodes = divCeil(parentSize, numRanksPerNode);
1507 numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1508 numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1509 ASSERT(numIONodes > 0);
1510 ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1511 numIORanks = numIONodes * numRanksPerNode;
1512
1513
1514 // It is surprisingly easy to launch the job with the wrong number
1515 // of ranks, particularly if the number of compute ranks is not a
1516 // multiple of numRanksPerNode (the tendency is to just launch one
1517 // rank per core for all available cores). So we introduce a third
1518 // option for a rank: besides being an i/o rank or a compute rank,
1519 // it might instead be an "excess" rank, in which case it just
1520 // calls MPI_Finalize and exits.
1521
1522 typedef enum { isCompute, isIO, isExcess } rankType;
1523 rankType thisRanksType;
1524
1525 if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1526 thisRanksType = isIO;
1527 } else if (parentRank >= numComputeRanks + numIORanks) {
1528 thisRanksType = isExcess;
1529 } else {
1530 thisRanksType = isCompute;
1531 }
1532
1533 // Split the parent communicator into parts
1534 MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1535 MPI_Comm_rank(newIntracomm, &newIntracommRank);
1536
1537 // Excess ranks disappear
1538 // N.B. "parentSize" still includes these ranks.
1539 if (isExcess == thisRanksType) {
1540 MPI_Finalize();
1541 exit(0);
1542 }
1543
1544 // Sanity check
1545 MPI_Comm_size(newIntracomm, &newIntracommSize);
1546 if (isIO == thisRanksType) {
1547 ASSERT(newIntracommSize == numIORanks);
1548 } else {
1549 ASSERT(newIntracommSize == numComputeRanks);
1550 }
1551
1552
1553 // Create a special intercomm from the i/o and compute parts
1554 if (isIO == thisRanksType) {
1555 // Set globals
1556 ioIntracomm = newIntracomm;
1557 MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1558
1559 // Find the lowest computation rank
1560 for (i = 0; i < parentSize; ++i) {
1561 if (!isIORank(i, totalNumNodes, numIONodes)) break;
1562 }
1563 } else { // isCompute
1564 // Set globals
1565 computeIntracomm = newIntracomm;
1566 MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1567
1568 // Find the lowest IO rank
1569 for (i = 0; i < parentSize; ++i) {
1570 if (isIORank(i, totalNumNodes, numIONodes)) break;
1571 }
1572 }
1573 MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1574
1575
1576
1577 ///////////////////////////////////////////////////////////////////
1578 // For each different i/o epoch style, split the i/o ranks among
1579 // the fields, and create an inter-communicator for each split.
1580
1581 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1582 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1583 int fieldAssignments[numIORanks];
1584 int rankAssignments[numComputeRanks + numIORanks];
1585
1586 // Count the number of fields in this epoch style
1587 for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1588
1589 // Decide how to apportion the i/o ranks among the fields
1590 for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1591 computeIORankAssigments(numComputeRanks, numIORanks, nF,
1592 thisEpochStyle, fieldAssignments);
1593 // Sanity check
1594 for (i=0; i < numIORanks; ++i) {
1595 ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1596 }
1597
1598 // Embed the i/o rank assignments into an array holding
1599 // the assignments for *all* the ranks (i/o and compute).
1600 // Rank assignment of "-1" means "compute".
1601 j = 0;
1602 for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1603 rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1604 fieldAssignments[j++] : -1;
1605 }
1606 // Sanity check. Check the assignment for this rank.
1607 if (isIO == thisRanksType) {
1608 ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1609 } else {
1610 ASSERT(-1 == rankAssignments[parentRank]);
1611 }
1612 ASSERT(j == numIORanks);
1613
1614 if (0 == parentRank) {
1615 printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1616 for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1617 if (rankAssignments[i*numRanksPerNode] >= 0) {
1618 // i/o node
1619 for (j = 0; j < numRanksPerNode; ++j) {
1620 printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1621 }
1622 } else {
1623 // compute node
1624 for (j = 0; j < numRanksPerNode; ++j) {
1625 if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1626 ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1627 }
1628 printf(" #");
1629 for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1630 printf("X");
1631 }
1632 }
1633 printf(" ");
1634 }
1635 printf("\n\n");
1636 }
1637
1638 // Find the lowest rank assigned to computation; use it as
1639 // the "root" for the upcoming intercomm creates.
1640 for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1641 // Sanity check
1642 if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1643
1644
1645 // If this is an I/O rank, split the newIntracomm (which, for
1646 // the i/o ranks, is a communicator among all the i/o ranks)
1647 // into the pieces as assigned.
1648
1649 if (isIO == thisRanksType) {
1650 MPI_Comm fieldIntracomm;
1651 int myField = rankAssignments[parentRank];
1652 ASSERT(myField >= 0);
1653
1654 MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1655 thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1656
1657 // Now create an inter-communicator between the computational
1658 // ranks, and each of the newly split off field communicators.
1659 for (i = 0; i < nF; ++i) {
1660
1661 // Do one field at a time to avoid clashes on parentComm
1662 MPI_Barrier(newIntracomm);
1663
1664 if (myField != i) continue;
1665
1666 // Create the intercomm for this field for this epoch style
1667 MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1668 compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1669
1670 // ... and dup a separate one for tile registrations
1671 MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1672 &(thisEpochStyle[myField].registrationIntercomm));
1673 }
1674 }
1675 else {
1676 // This is a computational rank; create the intercommunicators
1677 // to the various split off separate field communicators.
1678
1679 for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1680
1681 // Find the remote "root" process for this field
1682 int fieldRoot = -1;
1683 while (rankAssignments[++fieldRoot] != fieldIndex);
1684
1685 // Create the intercomm for this field for this epoch style
1686 MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1687 fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1688
1689 // ... and dup a separate one for tile registrations
1690 MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1691 &(thisEpochStyle[fieldIndex].registrationIntercomm));
1692 }
1693 }
1694
1695 } // epoch style loop
1696
1697
1698 // I/O processes split off and start receiving data
1699 // NOTE: the I/O processes DO NOT RETURN from this call
1700 if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1701
1702
1703 // The "compute" processes now return with their new intra-communicator.
1704 *rtnComputeComm = newIntracomm;
1705
1706 // but first, call mpiio initialization routine!
1707 initSizesAndTypes();
1708
1709 return;
1710 }
1711
1712
1713
1714
1715 // "Register" the tile-id(s) that this process will be sending
1716 void
1717 f2(int tileID)
1718 {
1719 int i, epochStyleIndex;
1720
1721 static int count = 0;
1722
1723 // This code assumes that the same tileID will apply to all the
1724 // fields. Note that we use the tileID as a tag, so we require it
1725 // be an int with a legal tag value, but we do *NOT* actually use
1726 // the tag *value* for anything (e.g. it is NOT used as an index
1727 // into an array). It is treated as an opaque handle that is
1728 // passed from the compute task(s) to the compositing routines.
1729 // Those two end-points likely assign meaning to the tileID (i.e.
1730 // use it to identify where the tile belongs within the domain),
1731 // but that is their business.
1732 //
1733 // A tileID of -1 signifies the end of registration.
1734
1735 ASSERT(computeIntracommRank >= 0);
1736
1737 if (tileID >= 0) {
1738 // In this version of the code, in addition to the tileID, we also
1739 // multiplex in the low order bit(s) of the epoch number as an
1740 // error check. So the tile value must be small enough to allow that.
1741 ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1742
1743 // In this version of the code, we do not send the actual tileID's
1744 // to the i/o processes during the registration procedure. We only
1745 // send a count of the number of tiles that we will send.
1746 ++count;
1747 return;
1748 }
1749
1750
1751 // We get here when we are called with a negative tileID, signifying
1752 // the end of registration. We now need to figure out and inform
1753 // *each* i/o process in *each* field just how many tiles we will be
1754 // sending them in *each* epoch style.
1755
1756 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1757 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1758
1759 int curFieldIndex = -1;
1760 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1761 fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1762 int numRemoteRanks, *tileCounts, remainder;
1763 MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1764
1765 tileCounts = alloca(numRemoteRanks * sizeof(int));
1766
1767 memset(tileCounts,0,numRemoteRanks * sizeof(int));
1768
1769 // Distribute the tiles among the i/o ranks.
1770 for (i = 0; i < numRemoteRanks; ++i) {
1771 tileCounts[i] = count / numRemoteRanks;
1772 }
1773 // Deal with any remainder
1774 remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1775 for (i = 0; i < remainder; ++i) {
1776 int target = (computeIntracommRank + i) % numRemoteRanks;
1777 tileCounts[target] += 1;
1778 }
1779
1780 // Communicate these counts to the i/o processes for this
1781 // field. Note that we send a message to each process,
1782 // even if the count is zero.
1783 for (i = 0; i < numRemoteRanks; ++i) {
1784 MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1785 thisField->registrationIntercomm);
1786 }
1787
1788 } // field loop
1789 } // epoch-style loop
1790
1791 }
1792
1793
1794
1795
1796 int currentEpochID = 0;
1797 int currentEpochStyle = 0;
1798
1799 void
1800 beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1801 {
1802 fieldInfoThisEpoch_t *p;
1803
1804 if (newEpochID != (currentEpochID + 1)) {
1805 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1806 currentEpochID, newEpochID);
1807 sleep(1); // Give the message a chance to propagate
1808 abort();
1809 }
1810
1811 ////////////////////////////////////////////////////////////////////////
1812 // Need to be sure the i/o tasks are done processing the previous epoch
1813 // before any compute tasks start sending tiles from a new epoch.
1814
1815 if (0 == computeIntracommRank) {
1816 int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1817
1818 // Wait to get an ack that the i/o tasks are done with the prior epoch.
1819 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1820 globalIntercomm, MPI_STATUS_IGNORE);
1821
1822 // Tell the i/o ranks about the new epoch.
1823 // (Just tell rank 0; it will bcast to the others)
1824 MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1825 }
1826
1827 // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1828 MPI_Barrier(computeIntracomm);
1829
1830
1831 currentEpochID = newEpochID;
1832 currentEpochStyle = epochStyle;
1833
1834 // Reset the tileCount (used by f3)
1835 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1836 p->tileCount = 0;
1837 }
1838 }
1839
1840
1841 void
1842 f3(char dataFieldID, int tileID, int epochID, void *data)
1843 {
1844 int whichField, receiver, tag;
1845
1846 static char priorDataFieldID = '\0';
1847 static int priorEpoch = -1;
1848 static int remoteCommSize;
1849 static fieldInfoThisEpoch_t *p;
1850
1851 int flag=0;
1852
1853 // Check that this global has been set
1854 ASSERT(computeIntracommRank >= 0);
1855
1856
1857 // Figure out some info about this dataFieldID. It is
1858 // likely to be another tile from the same field as last time
1859 // we were called, in which case we can reuse the "static" values
1860 // retained from the prior call. If not, we need to look it up.
1861
1862 if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1863
1864 // It's not the same; we need to look it up.
1865
1866 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1867 if (p->dataFieldID == dataFieldID) break;
1868 }
1869 // Make sure we found a valid field
1870
1871 ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1872
1873 MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1874
1875 flag=1;
1876
1877 }
1878
1879 ASSERT(p->dataFieldID == dataFieldID);
1880
1881 receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1882
1883 tag = (tileID << numCheckBits) | (epochID & bitMask);
1884
1885 //fprintf(stderr,"%d %d\n",flag,tileID);
1886
1887 /*
1888 if (tileID==189){
1889 int i,j;
1890 for (i=0;i<TILE_Y;++i){
1891 for (j=0;j<TILE_X;++j)
1892 fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1893 fprintf(stderr,"\n");
1894 }
1895 }
1896
1897 */
1898
1899
1900
1901 FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1902 computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1903
1904 MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1905 MPI_BYTE, receiver, tag, p->dataIntercomm);
1906
1907 p->tileCount += 1;
1908 priorDataFieldID = dataFieldID;
1909 priorEpoch = epochID;
1910 }
1911
1912
1913
1914 void
1915 f4(int epochID)
1916 {
1917 int i;
1918 ASSERT(computeIntracommRank >= 0);
1919
1920 if (0 == computeIntracommRank) {
1921 int cmd[3] = { cmd_exit, -1, -1 };
1922
1923 // Recv the ack that the prior i/o epoch is complete
1924 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1925 globalIntercomm, MPI_STATUS_IGNORE);
1926
1927 // Tell the i/o ranks to exit
1928 // Just tell rank 0; it will bcast to the others
1929 MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1930 }
1931
1932 }
1933
1934
1935
1936 void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1937 int *nt, int *nb,
1938 int *tx, int *ty,
1939 int *ox, int *oy)
1940 {
1941
1942
1943 int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1944
1945 int items = sizeof(data)/sizeof(int);
1946
1947
1948 NTILES = *nt; // total number of tiles
1949
1950
1951 TILE_X = *tx;
1952 TILE_Y = *ty;
1953 XGHOSTS = *ox;
1954 YGHOSTS = *oy;
1955 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1956 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1957
1958
1959 int rank,ierr;
1960 MPI_Comm_rank(globalIntercomm,&rank);
1961
1962
1963 if (0==rank)
1964 printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1965
1966
1967 /*
1968 if (0==rank)
1969 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1970 else
1971 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1972 */
1973
1974 if (0==rank)
1975 ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1976 else
1977 ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1978
1979 }
1980
1981 void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1982 {
1983 int rank,ierr;
1984 MPI_Comm_rank(globalIntercomm,&rank);
1985
1986 if (0==rank)
1987 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1988 else
1989 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1990
1991 if (0==rank)
1992 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1993 else
1994 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1995
1996 if (0==rank)
1997 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1998 else
1999 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
2000
2001 }
2002
2003
2004
2005
2006
2007 //////////////////////////////////////////////////////////////////////
2008 // Fortran interface
2009
2010 void
2011 bron_f1(
2012 MPI_Fint *ptr_parentComm,
2013 int *ptr_numComputeRanks,
2014 int *ptr_totalNumTiles,
2015 MPI_Fint *rtnComputeComm)
2016 {
2017 // Convert the Fortran handle into a C handle
2018 MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2019
2020 f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
2021
2022 // Convert the C handle into a Fortran handle
2023 *rtnComputeComm = MPI_Comm_c2f(newComm);
2024 }
2025
2026
2027
2028 void
2029 bron_f2(int *ptr_tileID)
2030 {
2031 f2(*ptr_tileID);
2032 }
2033
2034
2035 void
2036 beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2037 {
2038 beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2039 }
2040
2041
2042 void
2043 bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2044 {
2045 f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2046 }
2047
2048
2049
2050 void
2051 bron_f4(int *ptr_epochID)
2052 {
2053 f4(*ptr_epochID);
2054 }
2055

  ViewVC Help
Powered by ViewVC 1.1.22