| 1 |
|
| 2 |
// Code to do the m-on-n fan-in, recompositing, and output, of data |
| 3 |
// tiles for mitGCM. |
| 4 |
// |
| 5 |
// There are aspects of this code that would be simpler in C++, but |
| 6 |
// we deliberately wrote the code in ansi-C to make linking it with |
| 7 |
// the Fortran main code easier (should be able to just include the |
| 8 |
// .o on the link line). |
| 9 |
|
| 10 |
|
| 11 |
#include "PACKAGES_CONFIG.h" |
| 12 |
|
| 13 |
#include <stdio.h> |
| 14 |
#include <unistd.h> |
| 15 |
#include <stdlib.h> |
| 16 |
#include <string.h> |
| 17 |
#include <sys/types.h> |
| 18 |
#include <sys/stat.h> |
| 19 |
#include <fcntl.h> |
| 20 |
|
| 21 |
#include <mpi.h> |
| 22 |
#include <alloca.h> |
| 23 |
|
| 24 |
|
| 25 |
#include <stdint.h> |
| 26 |
#include <errno.h> |
| 27 |
|
| 28 |
#define DEBUG 1 |
| 29 |
|
| 30 |
#if (DEBUG >= 2) |
| 31 |
#define FPRINTF fprintf |
| 32 |
#else |
| 33 |
#include <stdarg.h> |
| 34 |
void FPRINTF(FILE *fp,...){return;} |
| 35 |
#endif |
| 36 |
|
| 37 |
|
| 38 |
// Define our own version of "assert", where we sleep rather than abort. |
| 39 |
// This makes it easier to attach the debugger. |
| 40 |
#if (DEBUG >= 1) |
| 41 |
#define ASSERT(_expr) \ |
| 42 |
if (!(_expr)) { \ |
| 43 |
fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \ |
| 44 |
getpid(), #_expr, __FILE__, __LINE__, __func__); \ |
| 45 |
sleep(9999); \ |
| 46 |
} |
| 47 |
#else |
| 48 |
#define ASSERT assert |
| 49 |
#endif |
| 50 |
|
| 51 |
|
| 52 |
|
| 53 |
// If numRanksPerNode is set to be > 0, we just use that setting. |
| 54 |
// If it is <= 0, we dynamically determine the number of cores on |
| 55 |
// a node, and use that. (N.B.: we assume *all* nodes being used in |
| 56 |
// the run have the *same* number of cores.) |
| 57 |
int numRanksPerNode = 0; |
| 58 |
|
| 59 |
// Just an error check; can be zero (if you're confident it's all correct). |
| 60 |
#define numCheckBits 2 |
| 61 |
// This bitMask definition works for 2's complement |
| 62 |
#define bitMask ((1 << numCheckBits) - 1) |
| 63 |
|
| 64 |
|
| 65 |
/////////////////////////////////////////////////////////////////////// |
| 66 |
// Info about the data fields |
| 67 |
|
| 68 |
// double for now. Might also be float someday |
| 69 |
#define datumType double |
| 70 |
#define datumSize (sizeof(datumType)) |
| 71 |
|
| 72 |
|
| 73 |
// Info about the data fields. We assume that all the fields have the |
| 74 |
// same X and Y characteristics, but may have either 1 or NUM_Z levels. |
| 75 |
// |
| 76 |
// the following 3 values are required here. Could be sent over or read in |
| 77 |
// with some rearrangement in init routines |
| 78 |
// |
| 79 |
#define NUM_X 2160 |
| 80 |
#define NUM_Y 28080L // get rid of this someday |
| 81 |
#define NUM_Z 90 |
| 82 |
#define MULTDIM 7 |
| 83 |
#define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize) |
| 84 |
#define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z) |
| 85 |
#define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM) |
| 86 |
|
| 87 |
// Info about the data tiles. We assume that all the tiles are the same |
| 88 |
// size (no odd-sized last piece), they all have the same X and Y |
| 89 |
// characteristics (including ghosting), and are full depth in Z |
| 90 |
// (either 1, or NUM_Z, or MULTDIM, as appropriate). |
| 91 |
// |
| 92 |
// all these data now sent over from compute ranks |
| 93 |
// |
| 94 |
int TILE_X = -1; |
| 95 |
int TILE_Y = -1; |
| 96 |
int XGHOSTS = -1; |
| 97 |
int YGHOSTS = -1; |
| 98 |
|
| 99 |
// Size of one Z level of a tile (NOT one Z level of a whole field) |
| 100 |
int tileOneZLevelItemCount = -1; |
| 101 |
int tileOneZLevelSizeInBytes = -1; |
| 102 |
|
| 103 |
|
| 104 |
typedef struct dataFieldDepth { |
| 105 |
char dataFieldID; |
| 106 |
int numZ; |
| 107 |
} dataFieldDepth_t; |
| 108 |
|
| 109 |
dataFieldDepth_t fieldDepths[] = { |
| 110 |
{ 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h |
| 111 |
|
| 112 |
{ 'B', 1 }, |
| 113 |
{ 'C', 1 }, |
| 114 |
{ 'D', 1 }, |
| 115 |
{ 'E', 1 }, |
| 116 |
{ 'F', 1 }, |
| 117 |
{ 'G', 1 }, |
| 118 |
{ 'H', 1 }, |
| 119 |
{ 'I', 1 }, |
| 120 |
{ 'J', 1 }, |
| 121 |
{ 'K', 1 }, |
| 122 |
{ 'L', 1 }, |
| 123 |
{ 'M', 1 }, |
| 124 |
{ 'N', 1 }, |
| 125 |
{ 'O', 1 }, |
| 126 |
{ 'P', 1 }, |
| 127 |
{ 'Q', 1 }, |
| 128 |
{ 'R', 1 }, |
| 129 |
|
| 130 |
{ 'S', NUM_Z }, |
| 131 |
{ 'T', NUM_Z }, |
| 132 |
{ 'U', NUM_Z }, |
| 133 |
{ 'V', NUM_Z }, |
| 134 |
{ 'W', NUM_Z }, |
| 135 |
{ 'X', NUM_Z }, |
| 136 |
{ 'Y', NUM_Z }, |
| 137 |
}; |
| 138 |
#define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t)) |
| 139 |
|
| 140 |
|
| 141 |
|
| 142 |
/////////////////////////////////////////////////////////////////////// |
| 143 |
// Info about the various kinds of i/o epochs |
| 144 |
typedef struct epochFieldInfo { |
| 145 |
char dataFieldID; |
| 146 |
MPI_Comm registrationIntercomm; |
| 147 |
MPI_Comm dataIntercomm; |
| 148 |
MPI_Comm ioRanksIntracomm; |
| 149 |
int tileCount; |
| 150 |
int zDepth; // duplicates the fieldDepth entry; filled in automatically |
| 151 |
char filenameTemplate[128]; |
| 152 |
long int offset; |
| 153 |
int pickup; |
| 154 |
} fieldInfoThisEpoch_t; |
| 155 |
|
| 156 |
// The normal i/o dump |
| 157 |
fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = { |
| 158 |
{ 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 }, |
| 159 |
{ 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 }, |
| 160 |
{ 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "W.%010d.%s", 0, 0 }, |
| 161 |
{ 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Salt.%010d.%s", 0, 0 }, |
| 162 |
{ 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Theta.%010d.%s", 0,0 }, |
| 163 |
{ 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 }, |
| 164 |
|
| 165 |
{ 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIarea.%010d.%s", 0,0 }, |
| 166 |
{ 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIheff.%010d.%s", 0,0 }, |
| 167 |
{ 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsnow.%010d.%s", 0,0 }, |
| 168 |
{ 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIuice.%010d.%s", 0,0 }, |
| 169 |
{ 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIvice.%010d.%s", 0,0 }, |
| 170 |
{ 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsalt.%010d.%s", 0,0 }, |
| 171 |
|
| 172 |
//{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "EtaHnm1.%010d.%s", 0,0 }, |
| 173 |
{ 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUX.%010d.%s", 0,0 }, |
| 174 |
{ 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUY.%010d.%s", 0,0 }, |
| 175 |
{ 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "KPPhbl.%010d.%s", 0,0 }, |
| 176 |
{ 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceSflux.%010d.%s", 0,0 }, |
| 177 |
|
| 178 |
{ 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceFWflx.%010d.%s", 0,0 }, |
| 179 |
{ 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQnet.%010d.%s", 0,0 }, |
| 180 |
{ 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "PhiBot.%010d.%s", 0,0 }, |
| 181 |
{ 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQsw.%010d.%s", 0,0 }, |
| 182 |
//{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "dEtaHdt.%010d.%s", 0,0 }, |
| 183 |
|
| 184 |
{'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 }, |
| 185 |
}; |
| 186 |
|
| 187 |
|
| 188 |
// pickup file |
| 189 |
fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = { |
| 190 |
{ 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1}, |
| 191 |
{ 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1}, |
| 192 |
{ 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1}, |
| 193 |
{ 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1}, |
| 194 |
{ 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1}, |
| 195 |
{ 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1}, |
| 196 |
{ 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1}, |
| 197 |
{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1}, |
| 198 |
{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1}, |
| 199 |
{'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1}, |
| 200 |
}; |
| 201 |
|
| 202 |
|
| 203 |
// seaice pickup |
| 204 |
fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = { |
| 205 |
{ 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2}, |
| 206 |
{ 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2}, |
| 207 |
{ 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2}, |
| 208 |
{ 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2}, |
| 209 |
{ 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2}, |
| 210 |
{ 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2}, |
| 211 |
{ 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2}, |
| 212 |
{'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 }, |
| 213 |
}; |
| 214 |
|
| 215 |
|
| 216 |
fieldInfoThisEpoch_t *epochStyles[] = { |
| 217 |
fieldsForEpochStyle_0, |
| 218 |
fieldsForEpochStyle_1, |
| 219 |
fieldsForEpochStyle_2, |
| 220 |
}; |
| 221 |
int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *)); |
| 222 |
|
| 223 |
|
| 224 |
typedef enum { |
| 225 |
cmd_illegal, |
| 226 |
cmd_newEpoch, |
| 227 |
cmd_epochComplete, |
| 228 |
cmd_exit, |
| 229 |
} epochCmd_t; |
| 230 |
|
| 231 |
|
| 232 |
/////////////////////////////////////////////////////////////////////// |
| 233 |
|
| 234 |
// Note that a rank will only have access to one of the Intracomms, |
| 235 |
// but all ranks will define the Intercomm. |
| 236 |
MPI_Comm ioIntracomm = MPI_COMM_NULL; |
| 237 |
int ioIntracommRank = -1; |
| 238 |
MPI_Comm computeIntracomm = MPI_COMM_NULL; |
| 239 |
int computeIntracommRank = -1; |
| 240 |
MPI_Comm globalIntercomm = MPI_COMM_NULL; |
| 241 |
|
| 242 |
|
| 243 |
#define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y)) |
| 244 |
#define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y)) |
| 245 |
|
| 246 |
|
| 247 |
|
| 248 |
typedef enum { |
| 249 |
bufState_illegal, |
| 250 |
bufState_Free, |
| 251 |
bufState_InUse, |
| 252 |
} bufState_t; |
| 253 |
|
| 254 |
|
| 255 |
typedef struct buf_header{ |
| 256 |
struct buf_header *next; |
| 257 |
bufState_t state; |
| 258 |
int requestsArraySize; |
| 259 |
MPI_Request *requests; |
| 260 |
uint64_t ck0; |
| 261 |
char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size |
| 262 |
uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in |
| 263 |
} bufHdr_t; // allocateTileBufs |
| 264 |
|
| 265 |
|
| 266 |
bufHdr_t *freeTileBufs_ptr = NULL; |
| 267 |
bufHdr_t *inUseTileBufs_ptr = NULL; |
| 268 |
|
| 269 |
int maxTagValue = -1; |
| 270 |
int totalNumTiles = -1; |
| 271 |
|
| 272 |
// routine to convert double to float during memcpy |
| 273 |
// need to get byteswapping in here as well |
| 274 |
memcpy_r8_2_r4 (float *f, double *d, long long *n) |
| 275 |
{ |
| 276 |
long long i, rem; |
| 277 |
rem = *n%16LL; |
| 278 |
for (i = 0; i < rem; i++) { |
| 279 |
f [i] = d [i]; |
| 280 |
} |
| 281 |
for (i = rem; i < *n; i += 16) { |
| 282 |
__asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) ); |
| 283 |
__asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) ); |
| 284 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0"); |
| 285 |
__asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory"); |
| 286 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1"); |
| 287 |
__asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory"); |
| 288 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2"); |
| 289 |
__asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory"); |
| 290 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3"); |
| 291 |
__asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory"); |
| 292 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4"); |
| 293 |
__asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory"); |
| 294 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5"); |
| 295 |
__asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory"); |
| 296 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6"); |
| 297 |
__asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory"); |
| 298 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7"); |
| 299 |
__asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) ); |
| 300 |
__asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory"); |
| 301 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8"); |
| 302 |
__asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory"); |
| 303 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9"); |
| 304 |
__asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory"); |
| 305 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10"); |
| 306 |
__asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory"); |
| 307 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11"); |
| 308 |
__asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory"); |
| 309 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12"); |
| 310 |
__asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory"); |
| 311 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13"); |
| 312 |
__asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory"); |
| 313 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14"); |
| 314 |
__asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory"); |
| 315 |
__asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15"); |
| 316 |
__asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory"); |
| 317 |
} |
| 318 |
} |
| 319 |
|
| 320 |
// Debug routine |
| 321 |
countBufs(int nbufs) |
| 322 |
{ |
| 323 |
int nInUse, nFree; |
| 324 |
bufHdr_t *bufPtr; |
| 325 |
static int target = -1; |
| 326 |
|
| 327 |
if (-1 == target) { |
| 328 |
ASSERT(-1 != nbufs); |
| 329 |
target = nbufs; |
| 330 |
} |
| 331 |
|
| 332 |
bufPtr = freeTileBufs_ptr; |
| 333 |
for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1; |
| 334 |
|
| 335 |
bufPtr = inUseTileBufs_ptr; |
| 336 |
for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1; |
| 337 |
|
| 338 |
if (nInUse + nFree != target) { |
| 339 |
fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n", |
| 340 |
ioIntracommRank, nFree, nInUse, target); |
| 341 |
sleep(5000); |
| 342 |
} |
| 343 |
} |
| 344 |
|
| 345 |
int readn(int fd, void *p, int nbytes) |
| 346 |
{ |
| 347 |
|
| 348 |
char *ptr = (char*)(p); |
| 349 |
|
| 350 |
int nleft, nread; |
| 351 |
|
| 352 |
nleft = nbytes; |
| 353 |
while (nleft > 0){ |
| 354 |
nread = read(fd, ptr, nleft); |
| 355 |
if (nread < 0) |
| 356 |
return(nread); // error |
| 357 |
else if (nread == 0) |
| 358 |
break; // EOF |
| 359 |
|
| 360 |
nleft -= nread; |
| 361 |
ptr += nread; |
| 362 |
} |
| 363 |
return (nbytes - nleft); |
| 364 |
} |
| 365 |
|
| 366 |
|
| 367 |
ssize_t writen(int fd, void *p, size_t nbytes) |
| 368 |
{ |
| 369 |
char *ptr = (char*)(p); |
| 370 |
|
| 371 |
size_t nleft; |
| 372 |
ssize_t nwritten; |
| 373 |
|
| 374 |
nleft = nbytes; |
| 375 |
while (nleft > 0){ |
| 376 |
nwritten = write(fd, ptr, nleft); |
| 377 |
if (nwritten <= 0){ |
| 378 |
if (errno==EINTR) continue; // POSIX, not SVr4 |
| 379 |
return(nwritten); // non-EINTR error |
| 380 |
} |
| 381 |
|
| 382 |
nleft -= nwritten; |
| 383 |
ptr += nwritten; |
| 384 |
} |
| 385 |
return(nbytes - nleft); |
| 386 |
} |
| 387 |
|
| 388 |
|
| 389 |
void write_pickup_meta(FILE *fp, int gcmIter, int pickup) |
| 390 |
{ |
| 391 |
int i; |
| 392 |
int ndims = 2; |
| 393 |
int nrecords,nfields; |
| 394 |
char**f; |
| 395 |
char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"}; |
| 396 |
char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"}; |
| 397 |
// for now, just list the fields here. When the whole field specification apparatus |
| 398 |
// is cleaned up, pull the names out of the epochstyle definition or whatever |
| 399 |
|
| 400 |
ASSERT(1==pickup||2==pickup); |
| 401 |
|
| 402 |
if (1==pickup){ |
| 403 |
nrecords = 6*NUM_Z+3; |
| 404 |
nfields = sizeof(fld1)/sizeof(char*); |
| 405 |
f = fld1; |
| 406 |
} |
| 407 |
else if (2==pickup){ |
| 408 |
nrecords = MULTDIM+6; |
| 409 |
nfields = sizeof(fld2)/sizeof(char*); |
| 410 |
f = fld2; |
| 411 |
} |
| 412 |
|
| 413 |
fprintf(fp," nDims = [ %3d ];\n",ndims); |
| 414 |
fprintf(fp," dimList = [\n"); |
| 415 |
fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X); |
| 416 |
fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y); |
| 417 |
fprintf(fp," ];\n"); |
| 418 |
fprintf(fp," dataprec = [ 'float64' ];\n"); |
| 419 |
fprintf(fp," nrecords = [ %5d ];\n",nrecords); |
| 420 |
fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter); |
| 421 |
fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be? |
| 422 |
fprintf(fp," nFlds = [ %4d ];\n",nfields); |
| 423 |
fprintf(fp," fldList = {\n"); |
| 424 |
for (i=0;i<nfields;++i) |
| 425 |
fprintf(fp," '%-8s'",f[i]); |
| 426 |
fprintf(fp,"\n };\n"); |
| 427 |
} |
| 428 |
|
| 429 |
|
| 430 |
|
| 431 |
double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs |
| 432 |
size_t outBufSize=0; |
| 433 |
|
| 434 |
|
| 435 |
void |
| 436 |
do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter) |
| 437 |
{ |
| 438 |
if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief! |
| 439 |
|
| 440 |
int pickup = whichField->pickup; |
| 441 |
|
| 442 |
/////////////////////////////// |
| 443 |
// swap here, if necessary |
| 444 |
// int i,j; |
| 445 |
|
| 446 |
//i = NUM_X*NUM_Y*numZ; |
| 447 |
//mds_byteswapr8_(&i,outBuf); |
| 448 |
|
| 449 |
// mds_byteswapr8 expects an integer count, which is gonna overflow someday |
| 450 |
// can't redefine to long without affecting a bunch of other calls |
| 451 |
// so do a slab at a time here, to delay the inevitable |
| 452 |
// i = NUM_X*NUM_Y; |
| 453 |
//for (j=0;j<numZ;++j) |
| 454 |
// mds_byteswapr8_(&i,&outBuf[i*j]); |
| 455 |
|
| 456 |
// gnu builtin evidently honored by intel compilers |
| 457 |
|
| 458 |
if (pickup) { |
| 459 |
uint64_t *alias = (uint64_t*)outBuf; |
| 460 |
size_t i; |
| 461 |
for (i=0;i<NUM_X*NUM_Y*numZ;++i) |
| 462 |
alias[i] = __builtin_bswap64(alias[i]); |
| 463 |
} |
| 464 |
else { |
| 465 |
uint32_t *alias = (uint32_t*)outBuf; |
| 466 |
size_t i; |
| 467 |
for (i=0;i<NUM_X*NUM_Y*numZ;++i) |
| 468 |
alias[i] = __builtin_bswap32(alias[i]); |
| 469 |
} |
| 470 |
|
| 471 |
// end of swappiness |
| 472 |
////////////////////////////////// |
| 473 |
|
| 474 |
char s[1024]; |
| 475 |
//sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID); |
| 476 |
|
| 477 |
sprintf(s,whichField->filenameTemplate,gcmIter,"data"); |
| 478 |
|
| 479 |
int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP); |
| 480 |
ASSERT(fd!=-1); |
| 481 |
|
| 482 |
size_t nbytes; |
| 483 |
|
| 484 |
if (pickup) { |
| 485 |
lseek(fd,whichField->offset,SEEK_SET); |
| 486 |
lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR); |
| 487 |
nbytes = NUM_X*NUM_Y*numZ*datumSize; |
| 488 |
} |
| 489 |
else { |
| 490 |
lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR); |
| 491 |
nbytes = NUM_X*NUM_Y*numZ*sizeof(float); |
| 492 |
} |
| 493 |
|
| 494 |
ssize_t bwrit = writen(fd,outBuf,nbytes); |
| 495 |
|
| 496 |
if (-1==bwrit) perror("Henze : file write problem : "); |
| 497 |
|
| 498 |
FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ); |
| 499 |
|
| 500 |
// ASSERT(nbytes == bwrit); |
| 501 |
|
| 502 |
if (nbytes!=bwrit) |
| 503 |
fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes); |
| 504 |
|
| 505 |
close(fd); |
| 506 |
|
| 507 |
|
| 508 |
return; |
| 509 |
} |
| 510 |
|
| 511 |
|
| 512 |
int NTILES = -1; |
| 513 |
|
| 514 |
typedef struct { |
| 515 |
int off; |
| 516 |
int skip; |
| 517 |
} tile_layout_t; |
| 518 |
|
| 519 |
tile_layout_t *offsetTable; |
| 520 |
|
| 521 |
void |
| 522 |
processSlabSection( |
| 523 |
fieldInfoThisEpoch_t *whichField, |
| 524 |
int tileID, |
| 525 |
void *data, |
| 526 |
int myNumZSlabs) |
| 527 |
{ |
| 528 |
int intracommSize,intracommRank; |
| 529 |
// MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize); |
| 530 |
MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank); |
| 531 |
//printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ); |
| 532 |
// printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off, |
| 533 |
// offsetTable[tileID].skip,myNumZSlabs); |
| 534 |
|
| 535 |
int z; |
| 536 |
|
| 537 |
int pickup = whichField->pickup; |
| 538 |
|
| 539 |
//ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t)))); |
| 540 |
ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated |
| 541 |
|
| 542 |
if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){ |
| 543 |
|
| 544 |
free(outBuf); |
| 545 |
|
| 546 |
outBufSize = myNumZSlabs * twoDFieldSizeInBytes; |
| 547 |
|
| 548 |
outBuf = malloc(outBufSize); |
| 549 |
ASSERT(outBuf); |
| 550 |
|
| 551 |
memset(outBuf,0,outBufSize); |
| 552 |
} |
| 553 |
|
| 554 |
for (z=0;z<myNumZSlabs;++z){ |
| 555 |
|
| 556 |
off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles; |
| 557 |
off_t hoffset = offsetTable[tileID].off; |
| 558 |
off_t skipdst = offsetTable[tileID].skip; |
| 559 |
|
| 560 |
//double *dst = outBuf + zoffset + hoffset; |
| 561 |
void *dst; |
| 562 |
if (pickup) |
| 563 |
dst = outBuf + zoffset + hoffset; |
| 564 |
else |
| 565 |
dst = (float*)outBuf + zoffset + hoffset; |
| 566 |
|
| 567 |
off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS); |
| 568 |
off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS; |
| 569 |
double *src = (double*)data + zoff + hoff; |
| 570 |
|
| 571 |
off_t skipsrc = TILE_X+2*XGHOSTS; |
| 572 |
|
| 573 |
//fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src); |
| 574 |
|
| 575 |
long long n = TILE_X; |
| 576 |
|
| 577 |
int y; |
| 578 |
if (pickup) |
| 579 |
for (y=0;y<TILE_Y;++y) |
| 580 |
memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize); |
| 581 |
else |
| 582 |
for (y=0;y<TILE_Y;++y) |
| 583 |
memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n); |
| 584 |
} |
| 585 |
|
| 586 |
return; |
| 587 |
} |
| 588 |
|
| 589 |
|
| 590 |
|
| 591 |
// Allocate some buffers to receive tile-data from the compute ranks |
| 592 |
void |
| 593 |
allocateTileBufs(int numTileBufs, int maxIntracommSize) |
| 594 |
{ |
| 595 |
|
| 596 |
ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now |
| 597 |
|
| 598 |
int i, j; |
| 599 |
for (i = 0; i < numTileBufs; ++i) { |
| 600 |
|
| 601 |
bufHdr_t *newBuf = malloc(sizeof(bufHdr_t)); |
| 602 |
ASSERT(NULL != newBuf); |
| 603 |
|
| 604 |
newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z); |
| 605 |
ASSERT(NULL != newBuf->payload); |
| 606 |
|
| 607 |
newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request)); |
| 608 |
ASSERT(NULL != newBuf->requests); |
| 609 |
|
| 610 |
// Init some values |
| 611 |
newBuf->requestsArraySize = maxIntracommSize; |
| 612 |
for (j = 0; j < maxIntracommSize; ++j) { |
| 613 |
newBuf->requests[j] = MPI_REQUEST_NULL; |
| 614 |
} |
| 615 |
newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef; |
| 616 |
|
| 617 |
// Put the buf into the free list |
| 618 |
newBuf->state = bufState_Free; |
| 619 |
newBuf->next = freeTileBufs_ptr; |
| 620 |
freeTileBufs_ptr = newBuf; |
| 621 |
} |
| 622 |
} |
| 623 |
|
| 624 |
|
| 625 |
|
| 626 |
bufHdr_t * |
| 627 |
getFreeBuf() |
| 628 |
{ |
| 629 |
int j; |
| 630 |
bufHdr_t *rtnValue = freeTileBufs_ptr; |
| 631 |
|
| 632 |
if (NULL != rtnValue) { |
| 633 |
ASSERT(bufState_Free == rtnValue->state); |
| 634 |
freeTileBufs_ptr = rtnValue->next; |
| 635 |
rtnValue->next = NULL; |
| 636 |
|
| 637 |
// Paranoia. This should already be the case |
| 638 |
for (j = 0; j < rtnValue->requestsArraySize; ++j) { |
| 639 |
rtnValue->requests[j] = MPI_REQUEST_NULL; |
| 640 |
} |
| 641 |
} |
| 642 |
return rtnValue; |
| 643 |
} |
| 644 |
|
| 645 |
|
| 646 |
///////////////////////////////////////////////////////////////// |
| 647 |
|
| 648 |
|
| 649 |
bufHdr_t * |
| 650 |
tryToReceiveDataTile( |
| 651 |
MPI_Comm dataIntercomm, |
| 652 |
int epochID, |
| 653 |
size_t expectedMsgSize, |
| 654 |
int *tileID) |
| 655 |
{ |
| 656 |
bufHdr_t *bufHdr; |
| 657 |
int pending, i, count; |
| 658 |
MPI_Status mpiStatus; |
| 659 |
|
| 660 |
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm, |
| 661 |
&pending, &mpiStatus); |
| 662 |
|
| 663 |
// If no data are pending, or if we can't get a buffer to |
| 664 |
// put the pending data into, return NULL |
| 665 |
if (!pending) return NULL; |
| 666 |
if (((bufHdr = getFreeBuf()) == NULL)) { |
| 667 |
FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n", |
| 668 |
mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits); |
| 669 |
return NULL; |
| 670 |
} |
| 671 |
|
| 672 |
// Do sanity checks on the pending message |
| 673 |
MPI_Get_count(&mpiStatus, MPI_BYTE, &count); |
| 674 |
ASSERT(count == expectedMsgSize); |
| 675 |
ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask)); |
| 676 |
|
| 677 |
// Recieve the data we saw in the iprobe |
| 678 |
MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE, |
| 679 |
mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG, |
| 680 |
dataIntercomm, &mpiStatus); |
| 681 |
bufHdr->state = bufState_InUse; |
| 682 |
|
| 683 |
// Overrun check |
| 684 |
ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef); |
| 685 |
ASSERT(bufHdr->ck0==bufHdr->ck1); |
| 686 |
|
| 687 |
// Return values |
| 688 |
*tileID = mpiStatus.MPI_TAG >> numCheckBits; |
| 689 |
|
| 690 |
|
| 691 |
FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n", |
| 692 |
mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE); |
| 693 |
|
| 694 |
return bufHdr; |
| 695 |
} |
| 696 |
|
| 697 |
|
| 698 |
|
| 699 |
int |
| 700 |
tryToReceiveZSlab( |
| 701 |
void *buf, |
| 702 |
int expectedMsgSize, |
| 703 |
MPI_Comm intracomm) |
| 704 |
{ |
| 705 |
MPI_Status mpiStatus; |
| 706 |
int pending, count; |
| 707 |
|
| 708 |
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus); |
| 709 |
if (!pending) return -1; |
| 710 |
|
| 711 |
MPI_Get_count(&mpiStatus, MPI_BYTE, &count); |
| 712 |
ASSERT(count == expectedMsgSize); |
| 713 |
|
| 714 |
MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE, |
| 715 |
mpiStatus.MPI_TAG, intracomm, &mpiStatus); |
| 716 |
|
| 717 |
FPRINTF(stderr, "recv slab %d from rank %d\n", |
| 718 |
mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE); |
| 719 |
|
| 720 |
// return the tileID |
| 721 |
return mpiStatus.MPI_TAG; |
| 722 |
} |
| 723 |
|
| 724 |
|
| 725 |
|
| 726 |
void |
| 727 |
redistributeZSlabs( |
| 728 |
bufHdr_t *bufHdr, |
| 729 |
int tileID, |
| 730 |
int zSlabsPer, |
| 731 |
int thisFieldNumZ, |
| 732 |
MPI_Comm intracomm) |
| 733 |
{ |
| 734 |
int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes; |
| 735 |
int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ; |
| 736 |
int offset = 0; |
| 737 |
int recvRank = 0; |
| 738 |
|
| 739 |
while ((offset + pieceSize) <= tileSizeInBytes) { |
| 740 |
|
| 741 |
MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank, |
| 742 |
tileID, intracomm, &(bufHdr->requests[recvRank])); |
| 743 |
ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]); |
| 744 |
|
| 745 |
offset += pieceSize; |
| 746 |
recvRank += 1; |
| 747 |
} |
| 748 |
|
| 749 |
// There might be one last odd-sized piece |
| 750 |
if (offset < tileSizeInBytes) { |
| 751 |
pieceSize = tileSizeInBytes - offset; |
| 752 |
ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0); |
| 753 |
|
| 754 |
MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank, |
| 755 |
tileID, intracomm, &(bufHdr->requests[recvRank])); |
| 756 |
ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]); |
| 757 |
|
| 758 |
offset += pieceSize; |
| 759 |
recvRank += 1; |
| 760 |
} |
| 761 |
|
| 762 |
// Sanity check |
| 763 |
ASSERT(recvRank <= bufHdr->requestsArraySize); |
| 764 |
while (recvRank < bufHdr->requestsArraySize) { |
| 765 |
ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++]) |
| 766 |
} |
| 767 |
} |
| 768 |
|
| 769 |
|
| 770 |
|
| 771 |
///////////////////////////////////////////////////////////////// |
| 772 |
// This is called by the i/o ranks |
| 773 |
void |
| 774 |
doNewEpoch(int epochID, int epochStyleIndex, int gcmIter) |
| 775 |
{ |
| 776 |
// In an i/o epoch, the i/o ranks are partitioned into groups, |
| 777 |
// each group dealing with one field. The ranks within a group: |
| 778 |
// (1) receive data tiles from the compute ranks |
| 779 |
// (2) slice the received data tiles into slabs in the 'z' |
| 780 |
// dimension, and redistribute the tile-slabs among the group. |
| 781 |
// (3) receive redistributed tile-slabs, and reconstruct a complete |
| 782 |
// field-slab for the whole field. |
| 783 |
// (4) Write the completed field-slab to disk. |
| 784 |
|
| 785 |
fieldInfoThisEpoch_t *fieldInfo; |
| 786 |
int intracommRank, intracommSize; |
| 787 |
|
| 788 |
int zSlabsPer; |
| 789 |
int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv; |
| 790 |
|
| 791 |
int numTilesRecvd, numSlabPiecesRecvd; |
| 792 |
|
| 793 |
|
| 794 |
// Find the descriptor for my assigned field for this epoch style. |
| 795 |
// It is the one whose dataIntercomm is not null |
| 796 |
fieldInfo = epochStyles[epochStyleIndex]; |
| 797 |
while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo; |
| 798 |
ASSERT('\0' != fieldInfo->dataFieldID); |
| 799 |
|
| 800 |
|
| 801 |
|
| 802 |
MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank); |
| 803 |
MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize); |
| 804 |
|
| 805 |
|
| 806 |
// Compute which z slab(s) we will be reassembling. |
| 807 |
zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize); |
| 808 |
myNumZSlabs = zSlabsPer; |
| 809 |
|
| 810 |
// Adjust myNumZSlabs in case it didn't divide evenly |
| 811 |
myFirstZSlab = intracommRank * myNumZSlabs; |
| 812 |
if (myFirstZSlab >= fieldInfo->zDepth) { |
| 813 |
myNumZSlabs = 0; |
| 814 |
} else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) { |
| 815 |
myNumZSlabs = fieldInfo->zDepth - myFirstZSlab; |
| 816 |
} else { |
| 817 |
myNumZSlabs = zSlabsPer; |
| 818 |
} |
| 819 |
myLastZSlab = myFirstZSlab + myNumZSlabs - 1; |
| 820 |
|
| 821 |
// If we were not assigned any z-slabs, we don't get any redistributed |
| 822 |
// tile-slabs. If we were assigned one or more slabs, we get |
| 823 |
// redistributed tile-slabs for every tile. |
| 824 |
myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles; |
| 825 |
|
| 826 |
|
| 827 |
numTilesRecvd = 0; |
| 828 |
numSlabPiecesRecvd = 0; |
| 829 |
|
| 830 |
//////////////////////////////////////////////////////////////////// |
| 831 |
// Main loop. Handle tiles from the current epoch |
| 832 |
for(;;) { |
| 833 |
bufHdr_t *bufHdr; |
| 834 |
|
| 835 |
//////////////////////////////////////////////////////////////// |
| 836 |
// Check for tiles from the computational tasks |
| 837 |
while (numTilesRecvd < fieldInfo->tileCount) { |
| 838 |
int tileID = -1; |
| 839 |
size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth; |
| 840 |
bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID, |
| 841 |
msgSize, &tileID); |
| 842 |
if (NULL == bufHdr) break; // No tile was received |
| 843 |
|
| 844 |
numTilesRecvd += 1; |
| 845 |
redistributeZSlabs(bufHdr, tileID, zSlabsPer, |
| 846 |
fieldInfo->zDepth, fieldInfo->ioRanksIntracomm); |
| 847 |
|
| 848 |
// Add the bufHdr to the "in use" list |
| 849 |
bufHdr->next = inUseTileBufs_ptr; |
| 850 |
inUseTileBufs_ptr = bufHdr; |
| 851 |
|
| 852 |
|
| 853 |
} |
| 854 |
|
| 855 |
|
| 856 |
//////////////////////////////////////////////////////////////// |
| 857 |
// Check for tile-slabs redistributed from the i/o processes |
| 858 |
while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) { |
| 859 |
int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs; |
| 860 |
char data[msgSize]; |
| 861 |
|
| 862 |
int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm); |
| 863 |
if (tileID < 0) break; // No slab was received |
| 864 |
|
| 865 |
|
| 866 |
numSlabPiecesRecvd += 1; |
| 867 |
processSlabSection(fieldInfo, tileID, data, myNumZSlabs); |
| 868 |
|
| 869 |
|
| 870 |
// Can do the write here, or at the end of the epoch. |
| 871 |
// Probably want to make it asynchronous (waiting for |
| 872 |
// completion at the barrier at the start of the next epoch). |
| 873 |
//if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) { |
| 874 |
// ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv); |
| 875 |
// do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs); |
| 876 |
//} |
| 877 |
} |
| 878 |
|
| 879 |
|
| 880 |
//////////////////////////////////////////////////////////////// |
| 881 |
// Check if we can release any of the inUse buffers (i.e. the |
| 882 |
// isends we used to redistribute those z-slabs have completed). |
| 883 |
// We do this by detaching the inUse list, then examining each |
| 884 |
// element in the list, putting each element either into the |
| 885 |
// free list or back into the inUse list, as appropriate. |
| 886 |
bufHdr = inUseTileBufs_ptr; |
| 887 |
inUseTileBufs_ptr = NULL; |
| 888 |
while (NULL != bufHdr) { |
| 889 |
int count = 0; |
| 890 |
int completions[intracommSize]; |
| 891 |
MPI_Status statuses[intracommSize]; |
| 892 |
|
| 893 |
// Acquire the "next" bufHdr now, before we change anything. |
| 894 |
bufHdr_t *nextBufHeader = bufHdr->next; |
| 895 |
|
| 896 |
// Check to see if the Isend requests have completed for this buf |
| 897 |
MPI_Testsome(intracommSize, bufHdr->requests, |
| 898 |
&count, completions, statuses); |
| 899 |
|
| 900 |
if (MPI_UNDEFINED == count) { |
| 901 |
// A return of UNDEFINED means that none of the requests |
| 902 |
// is still outstanding, i.e. they have all completed. |
| 903 |
// Put the buf on the free list. |
| 904 |
bufHdr->state = bufState_Free; |
| 905 |
bufHdr->next = freeTileBufs_ptr; |
| 906 |
freeTileBufs_ptr = bufHdr; |
| 907 |
FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank); |
| 908 |
} else { |
| 909 |
// At least one request is still outstanding. |
| 910 |
// Put the buf on the inUse list. |
| 911 |
bufHdr->next = inUseTileBufs_ptr; |
| 912 |
inUseTileBufs_ptr = bufHdr; |
| 913 |
} |
| 914 |
|
| 915 |
// Countinue with the next buffer |
| 916 |
bufHdr = nextBufHeader; |
| 917 |
} |
| 918 |
|
| 919 |
|
| 920 |
//////////////////////////////////////////////////////////////// |
| 921 |
// Check if the epoch is complete |
| 922 |
if ((numTilesRecvd >= fieldInfo->tileCount) && |
| 923 |
(numSlabPiecesRecvd >= myNumSlabPiecesToRecv) && |
| 924 |
(NULL == inUseTileBufs_ptr)) |
| 925 |
{ |
| 926 |
ASSERT(numTilesRecvd == fieldInfo->tileCount); |
| 927 |
ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv); |
| 928 |
|
| 929 |
//fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs); |
| 930 |
|
| 931 |
// Ok, wrap up the current i/o epoch |
| 932 |
// Probably want to make this asynchronous (waiting for |
| 933 |
// completion at the start of the next epoch). |
| 934 |
do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter); |
| 935 |
|
| 936 |
// The epoch is complete |
| 937 |
return; |
| 938 |
} |
| 939 |
} |
| 940 |
|
| 941 |
} |
| 942 |
|
| 943 |
|
| 944 |
|
| 945 |
|
| 946 |
|
| 947 |
|
| 948 |
|
| 949 |
// Main routine for the i/o tasks. Callers DO NOT RETURN from |
| 950 |
// this routine; they MPI_Finalize and exit. |
| 951 |
void |
| 952 |
ioRankMain(int totalNumTiles) |
| 953 |
{ |
| 954 |
// This is one of a group of i/o processes that will be receiving tiles |
| 955 |
// from the computational processes. This same process is also |
| 956 |
// responsible for compositing slices of tiles together into one or |
| 957 |
// more complete slabs, and then writing those slabs out to disk. |
| 958 |
// |
| 959 |
// When a tile is received, it is cut up into slices, and each slice is |
| 960 |
// sent to the appropriate compositing task that is dealing with those |
| 961 |
// "z" level(s) (possibly including ourselves). These are tagged with |
| 962 |
// the tile-id. When we *receive* such a message (from ourselves, or |
| 963 |
// from any other process in this group), we unpack the data (stripping |
| 964 |
// the ghost cells), and put them into the slab we are assembling. Once |
| 965 |
// a slab is fully assembled, it is written to disk. |
| 966 |
|
| 967 |
int i, size, count, numTileBufs; |
| 968 |
MPI_Status status; |
| 969 |
int currentEpochID = 0; |
| 970 |
int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0; |
| 971 |
|
| 972 |
int numComputeRanks, epochStyleIndex; |
| 973 |
MPI_Comm_remote_size(globalIntercomm, &numComputeRanks); |
| 974 |
|
| 975 |
|
| 976 |
//////////////////////////////////////////////////////////// |
| 977 |
//// get global tile info via bcast over the global intercom |
| 978 |
|
| 979 |
|
| 980 |
// int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm); |
| 981 |
|
| 982 |
int data[9]; |
| 983 |
|
| 984 |
int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm); |
| 985 |
|
| 986 |
NTILES = data[3]; |
| 987 |
TILE_X = data[5]; |
| 988 |
TILE_Y = data[6]; |
| 989 |
XGHOSTS = data[7]; |
| 990 |
YGHOSTS = data[8]; |
| 991 |
|
| 992 |
tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS)); |
| 993 |
tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh |
| 994 |
|
| 995 |
int *xoff = malloc(NTILES*sizeof(int)); |
| 996 |
ASSERT(xoff); |
| 997 |
int *yoff = malloc(NTILES*sizeof(int)); |
| 998 |
ASSERT(yoff); |
| 999 |
int *xskip = malloc(NTILES*sizeof(int)); |
| 1000 |
ASSERT(xskip); |
| 1001 |
|
| 1002 |
offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t)); |
| 1003 |
ASSERT(offsetTable); |
| 1004 |
|
| 1005 |
ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm); |
| 1006 |
ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm); |
| 1007 |
ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm); |
| 1008 |
|
| 1009 |
for (i=0;i<NTILES;++i){ |
| 1010 |
offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1; |
| 1011 |
offsetTable[i+1].skip = xskip[i]; |
| 1012 |
} |
| 1013 |
|
| 1014 |
free(xoff); |
| 1015 |
free(yoff); |
| 1016 |
free(xskip); |
| 1017 |
|
| 1018 |
/////////////////////////////////////////////////////////////// |
| 1019 |
|
| 1020 |
|
| 1021 |
// At this point, the multitude of various inter-communicators have |
| 1022 |
// been created and stored in the various epoch "style" arrays. |
| 1023 |
// The next step is to have the computational tasks "register" the |
| 1024 |
// data tiles they will send. In this version of the code, we only |
| 1025 |
// track and store *counts*, not the actual tileIDs. |
| 1026 |
for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) { |
| 1027 |
fieldInfoThisEpoch_t *p = NULL; |
| 1028 |
fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex]; |
| 1029 |
int thisIntracommSize; |
| 1030 |
|
| 1031 |
// Find the field we were assigned for this epoch style |
| 1032 |
int curFieldIndex = -1; |
| 1033 |
while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) { |
| 1034 |
p = &(thisEpochStyle[curFieldIndex]); |
| 1035 |
if (MPI_COMM_NULL != p->registrationIntercomm) break; |
| 1036 |
} |
| 1037 |
ASSERT(NULL != p); |
| 1038 |
ASSERT('\0' != p->dataFieldID); |
| 1039 |
MPI_Comm_size(p->ioRanksIntracomm, &size); |
| 1040 |
if (size > maxIntracommSize) maxIntracommSize = size; |
| 1041 |
|
| 1042 |
|
| 1043 |
// Receive a message from *each* computational rank telling us |
| 1044 |
// a count of how many tiles that rank will send during epochs |
| 1045 |
// of this style. Note that some of these counts may be zero |
| 1046 |
for (i = 0; i < numComputeRanks; ++i) { |
| 1047 |
MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, |
| 1048 |
p->registrationIntercomm, &status); |
| 1049 |
p->tileCount += status.MPI_TAG; |
| 1050 |
} |
| 1051 |
if (p->tileCount > maxTileCount) maxTileCount = p->tileCount; |
| 1052 |
if (p->zDepth > 1) { |
| 1053 |
if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount; |
| 1054 |
} |
| 1055 |
|
| 1056 |
// Sanity check |
| 1057 |
MPI_Allreduce (&(p->tileCount), &count, 1, |
| 1058 |
MPI_INT, MPI_SUM, p->ioRanksIntracomm); |
| 1059 |
ASSERT(count == totalNumTiles); |
| 1060 |
} |
| 1061 |
|
| 1062 |
|
| 1063 |
// In some as-of-yet-undetermined fashion, we decide how many |
| 1064 |
// buffers to allocate to hold tiles received from the computational |
| 1065 |
// tasks. The number of such buffers is more-or-less arbitrary (the |
| 1066 |
// code should be able to function with any number greater than zero). |
| 1067 |
// More buffers means more parallelism, but uses more space. |
| 1068 |
// Note that maxTileCount is an upper bound on the number of buffers |
| 1069 |
// that we can usefully use. Note also that if we are assigned a 2D |
| 1070 |
// field, we might be assigned to recieve a very large number of tiles |
| 1071 |
// for that field in that epoch style. |
| 1072 |
|
| 1073 |
// Hack - for now, just pick a value for numTileBufs |
| 1074 |
numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount; |
| 1075 |
if (numTileBufs < 4) numTileBufs = 4; |
| 1076 |
if (numTileBufs > 15) numTileBufs = 15; |
| 1077 |
// if (numTileBufs > 25) numTileBufs = 25; |
| 1078 |
|
| 1079 |
allocateTileBufs(numTileBufs, maxIntracommSize); |
| 1080 |
countBufs(numTileBufs); |
| 1081 |
|
| 1082 |
|
| 1083 |
//////////////////////////////////////////////////////////////////// |
| 1084 |
// Main loop. |
| 1085 |
//////////////////////////////////////////////////////////////////// |
| 1086 |
|
| 1087 |
for (;;) { |
| 1088 |
int cmd[4]; |
| 1089 |
|
| 1090 |
// Make sure all the i/o ranks have completed processing the prior |
| 1091 |
// cmd (i.e. the prior i/o epoch is complete). |
| 1092 |
MPI_Barrier(ioIntracomm); |
| 1093 |
|
| 1094 |
if (0 == ioIntracommRank) { |
| 1095 |
fprintf(stderr, "I/O ranks waiting for new epoch\n"); |
| 1096 |
MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm); |
| 1097 |
|
| 1098 |
MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE); |
| 1099 |
fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]); |
| 1100 |
|
| 1101 |
// before we start a new epoch, have i/o rank 0: |
| 1102 |
// determine output filenames for this epoch |
| 1103 |
// clean up any extant files with same names |
| 1104 |
// write .meta files |
| 1105 |
|
| 1106 |
if (cmd_exit != cmd[0]){ |
| 1107 |
|
| 1108 |
fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]); |
| 1109 |
|
| 1110 |
int epochStyle = cmd[2]; |
| 1111 |
int gcmIter = cmd[3]; |
| 1112 |
|
| 1113 |
fieldInfoThisEpoch_t *fieldInfo; |
| 1114 |
fieldInfo = epochStyles[epochStyle]; |
| 1115 |
char s[1024]; |
| 1116 |
int res; |
| 1117 |
FILE *fp; |
| 1118 |
|
| 1119 |
if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields |
| 1120 |
char f; |
| 1121 |
while (f = fieldInfo->dataFieldID){ |
| 1122 |
sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data"); |
| 1123 |
fprintf(stderr,"%s\n",s); |
| 1124 |
res = unlink(s); |
| 1125 |
if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s); |
| 1126 |
|
| 1127 |
// skip writing meta files for non-pickup fields |
| 1128 |
/* |
| 1129 |
sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta"); |
| 1130 |
fp = fopen(s,"w+"); |
| 1131 |
fclose(fp); |
| 1132 |
*/ |
| 1133 |
|
| 1134 |
++fieldInfo; |
| 1135 |
|
| 1136 |
} |
| 1137 |
} |
| 1138 |
|
| 1139 |
else { // single pickup or pickup_seaice file |
| 1140 |
|
| 1141 |
sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data"); |
| 1142 |
fprintf(stderr,"%s\n",s); |
| 1143 |
res = unlink(s); |
| 1144 |
if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s); |
| 1145 |
|
| 1146 |
sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta"); |
| 1147 |
fp = fopen(s,"w+"); |
| 1148 |
write_pickup_meta(fp, gcmIter, fieldInfo->pickup); |
| 1149 |
fclose(fp); |
| 1150 |
|
| 1151 |
} |
| 1152 |
} |
| 1153 |
} |
| 1154 |
MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm); |
| 1155 |
|
| 1156 |
switch (cmd[0]) { |
| 1157 |
|
| 1158 |
case cmd_exit: |
| 1159 |
// Don't bother trying to disconnect and free the |
| 1160 |
// plethora of communicators; just exit. |
| 1161 |
FPRINTF(stderr,"Received shutdown message\n"); |
| 1162 |
MPI_Finalize(); |
| 1163 |
exit(0); |
| 1164 |
break; |
| 1165 |
|
| 1166 |
case cmd_newEpoch: |
| 1167 |
if ((currentEpochID + 1) != cmd[1]) { |
| 1168 |
fprintf(stderr, "ERROR: Missing i/o epoch? was %d, " |
| 1169 |
"but is now %d ??\n", currentEpochID, cmd[1]); |
| 1170 |
sleep(1); // Give the message a chance to propagate |
| 1171 |
abort(); |
| 1172 |
} |
| 1173 |
currentEpochID = cmd[1]; |
| 1174 |
|
| 1175 |
memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined |
| 1176 |
|
| 1177 |
doNewEpoch(cmd[1], cmd[2], cmd[3]); |
| 1178 |
break; |
| 1179 |
|
| 1180 |
default: |
| 1181 |
fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n", |
| 1182 |
cmd[0], cmd[1], cmd[2], cmd[3]); |
| 1183 |
sleep(1); |
| 1184 |
abort(); |
| 1185 |
break; |
| 1186 |
} |
| 1187 |
|
| 1188 |
} |
| 1189 |
|
| 1190 |
} |
| 1191 |
|
| 1192 |
|
| 1193 |
|
| 1194 |
/////////////////////////////////////////////////////////////////////////////////// |
| 1195 |
|
| 1196 |
int |
| 1197 |
findField(const char c) |
| 1198 |
{ |
| 1199 |
int i; |
| 1200 |
for (i = 0; i < numAllFields; ++i) { |
| 1201 |
if (c == fieldDepths[i].dataFieldID) return i; |
| 1202 |
} |
| 1203 |
|
| 1204 |
// Give the error message a chance to propagate before exiting. |
| 1205 |
fprintf(stderr, "ERROR: Field not found: '%c'\n", c); |
| 1206 |
sleep(1); |
| 1207 |
abort(); |
| 1208 |
} |
| 1209 |
|
| 1210 |
|
| 1211 |
|
| 1212 |
// Given a number of ranks and a set of fields we will want to output, |
| 1213 |
// figure out how to distribute the ranks among the fields. |
| 1214 |
void |
| 1215 |
computeIORankAssigments( |
| 1216 |
int numComputeRanks, |
| 1217 |
int numIORanks, |
| 1218 |
int numFields, |
| 1219 |
fieldInfoThisEpoch_t *fields, |
| 1220 |
int assignments[]) |
| 1221 |
{ |
| 1222 |
int i,j,k, sum, count; |
| 1223 |
|
| 1224 |
int numIONodes = numIORanks / numRanksPerNode; |
| 1225 |
int numIORanksThisField[numFields]; |
| 1226 |
long int bytesPerIORankThisField[numFields]; |
| 1227 |
int expectedMessagesPerRankThisField[numFields]; |
| 1228 |
long int fieldSizes[numFields]; |
| 1229 |
|
| 1230 |
struct ioNodeInfo_t { |
| 1231 |
int expectedNumMessagesThisNode; |
| 1232 |
int numCoresAssigned; |
| 1233 |
int *assignedFieldThisCore; |
| 1234 |
} ioNodeInfo[numIONodes]; |
| 1235 |
|
| 1236 |
// Since numRanksPerNode might be dynamically determined, we have |
| 1237 |
// to dynamically allocate the assignedFieldThisCore arrays. |
| 1238 |
for (i = 0; i < numIONodes; ++i) { |
| 1239 |
ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode); |
| 1240 |
ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore); |
| 1241 |
} |
| 1242 |
|
| 1243 |
ASSERT((numIONodes*numRanksPerNode) == numIORanks); |
| 1244 |
|
| 1245 |
|
| 1246 |
////////////////////////////////////////////////////////////// |
| 1247 |
// Compute the size for each field in this epoch style |
| 1248 |
for (i = 0; i < numFields; ++i) { |
| 1249 |
ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth)); |
| 1250 |
fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth; |
| 1251 |
} |
| 1252 |
|
| 1253 |
|
| 1254 |
///////////////////////////////////////////////////////// |
| 1255 |
// Distribute the available i/o ranks among the fields, |
| 1256 |
// proportionally based on field size. |
| 1257 |
|
| 1258 |
// First, assign one rank per field |
| 1259 |
ASSERT(numIORanks >= numFields); |
| 1260 |
for (i = 0; i < numFields; ++i) { |
| 1261 |
numIORanksThisField[i] = 1; |
| 1262 |
bytesPerIORankThisField[i] = fieldSizes[i]; |
| 1263 |
} |
| 1264 |
|
| 1265 |
// Now apportion any extra ranks |
| 1266 |
for (; i < numIORanks; ++i) { |
| 1267 |
|
| 1268 |
// Find the field 'k' with the most bytesPerIORank |
| 1269 |
k = 0; |
| 1270 |
for (j = 1; j < numFields; ++j) { |
| 1271 |
if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) { |
| 1272 |
k = j; |
| 1273 |
} |
| 1274 |
} |
| 1275 |
|
| 1276 |
// Assign an i/o rank to that field |
| 1277 |
numIORanksThisField[k] += 1; |
| 1278 |
bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k]; |
| 1279 |
} |
| 1280 |
|
| 1281 |
//////////////////////////////////////////////////////////// |
| 1282 |
// At this point, all the i/o ranks should be apportioned |
| 1283 |
// among the fields. Check we didn't screw up the count. |
| 1284 |
for (sum = 0, i = 0; i < numFields; ++i) { |
| 1285 |
sum += numIORanksThisField[i]; |
| 1286 |
} |
| 1287 |
ASSERT(sum == numIORanks); |
| 1288 |
|
| 1289 |
|
| 1290 |
|
| 1291 |
////////////////////////////////////////////////////////////////// |
| 1292 |
// The *number* of i/o ranks assigned to a field is based on the |
| 1293 |
// field size. But the *placement* of those ranks is based on |
| 1294 |
// the expected number of messages the process will receive. |
| 1295 |
// [In particular, if we have several fields each with only one |
| 1296 |
// assigned i/o rank, we do not want to place them all on the |
| 1297 |
// same node if we don't have to.] This traffic-load balance |
| 1298 |
// strategy is an approximation at best since the messages may |
| 1299 |
// not all be the same size (e.g. 2D fields vs. 3D fields), so |
| 1300 |
// "number of messages" is not the same thing as "traffic". |
| 1301 |
// But it should suffice. |
| 1302 |
|
| 1303 |
// Init a couple of things |
| 1304 |
for (i = 0; i < numFields; ++i) { |
| 1305 |
expectedMessagesPerRankThisField[i] = |
| 1306 |
numComputeRanks / numIORanksThisField[i]; |
| 1307 |
} |
| 1308 |
for (i = 0; i < numIONodes; ++i) { |
| 1309 |
ioNodeInfo[i].expectedNumMessagesThisNode = 0; |
| 1310 |
ioNodeInfo[i].numCoresAssigned = 0; |
| 1311 |
for (j = 0; j < numRanksPerNode; ++j) { |
| 1312 |
ioNodeInfo[i].assignedFieldThisCore[j] = -1; |
| 1313 |
} |
| 1314 |
} |
| 1315 |
|
| 1316 |
/////////////////////////////////////////////////////////////////// |
| 1317 |
// Select the i/o node with the smallest expectedNumMessages, and |
| 1318 |
// assign it a rank from the field with the highest remaining |
| 1319 |
// expectedMessagesPerRank. Repeat until everything is assigned. |
| 1320 |
// (Yes, this could be a lot faster, but isn't worth the trouble.) |
| 1321 |
|
| 1322 |
for (count = 0; count < numIORanks; ++count) { |
| 1323 |
|
| 1324 |
// Make an initial choice of node |
| 1325 |
for (i = 0; i < numIONodes; ++i) { |
| 1326 |
if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break; |
| 1327 |
} |
| 1328 |
j = i; |
| 1329 |
ASSERT(j < numIONodes); |
| 1330 |
|
| 1331 |
// Search for a better choice |
| 1332 |
for (++i; i < numIONodes; ++i) { |
| 1333 |
if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue; |
| 1334 |
if (ioNodeInfo[i].expectedNumMessagesThisNode < |
| 1335 |
ioNodeInfo[j].expectedNumMessagesThisNode) |
| 1336 |
{ |
| 1337 |
j = i; |
| 1338 |
} |
| 1339 |
} |
| 1340 |
|
| 1341 |
|
| 1342 |
// Make an initial choice of field |
| 1343 |
for (i = 0; i < numFields; ++i) { |
| 1344 |
if (numIORanksThisField[i] > 0) break; |
| 1345 |
} |
| 1346 |
k = i; |
| 1347 |
ASSERT(k < numFields); |
| 1348 |
|
| 1349 |
// Search for a better choice |
| 1350 |
for (++i; i < numFields; ++i) { |
| 1351 |
if (numIORanksThisField[i] <= 0) continue; |
| 1352 |
if (expectedMessagesPerRankThisField[i] > |
| 1353 |
expectedMessagesPerRankThisField[k]) |
| 1354 |
{ |
| 1355 |
k = i; |
| 1356 |
} |
| 1357 |
} |
| 1358 |
|
| 1359 |
// Put one rank from the chosen field onto the chosen node |
| 1360 |
ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k]; |
| 1361 |
ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k; |
| 1362 |
ioNodeInfo[j].numCoresAssigned += 1; |
| 1363 |
numIORanksThisField[k] -= 1; |
| 1364 |
} |
| 1365 |
|
| 1366 |
// Sanity check - all ranks were assigned to a core |
| 1367 |
for (i = 1; i < numFields; ++i) { |
| 1368 |
ASSERT(0 == numIORanksThisField[i]); |
| 1369 |
} |
| 1370 |
// Sanity check - all cores were assigned a rank |
| 1371 |
for (i = 1; i < numIONodes; ++i) { |
| 1372 |
ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned); |
| 1373 |
} |
| 1374 |
|
| 1375 |
|
| 1376 |
///////////////////////////////////// |
| 1377 |
// Return the computed assignments |
| 1378 |
for (i = 0; i < numIONodes; ++i) { |
| 1379 |
for (j = 0; j < numRanksPerNode; ++j) { |
| 1380 |
assignments[i*numRanksPerNode + j] = |
| 1381 |
ioNodeInfo[i].assignedFieldThisCore[j]; |
| 1382 |
} |
| 1383 |
} |
| 1384 |
|
| 1385 |
// Clean up |
| 1386 |
for (i = 0; i < numIONodes; ++i) { |
| 1387 |
free(ioNodeInfo[i].assignedFieldThisCore); |
| 1388 |
} |
| 1389 |
|
| 1390 |
} |
| 1391 |
|
| 1392 |
////////////////////////////////////////////////////////////////////////////////// |
| 1393 |
|
| 1394 |
|
| 1395 |
|
| 1396 |
|
| 1397 |
int |
| 1398 |
isIORank(int commRank, int totalNumNodes, int numIONodes) |
| 1399 |
{ |
| 1400 |
// Figure out if this rank is on a node that will do i/o. |
| 1401 |
// Note that the i/o nodes are distributed throughout the |
| 1402 |
// task, not clustered together. |
| 1403 |
int ioNodeStride = totalNumNodes / numIONodes; |
| 1404 |
int thisRankNode = commRank / numRanksPerNode; |
| 1405 |
int n = thisRankNode / ioNodeStride; |
| 1406 |
return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0; |
| 1407 |
} |
| 1408 |
|
| 1409 |
|
| 1410 |
// Find the number of *physical cores* on the current node |
| 1411 |
// (ignore hyperthreading). This should work for pretty much |
| 1412 |
// any Linux based system (and fail horribly for anything else). |
| 1413 |
int |
| 1414 |
getNumCores(void) |
| 1415 |
{ |
| 1416 |
return 20; // until we figure out why this cratered |
| 1417 |
|
| 1418 |
char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | " |
| 1419 |
"tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | " |
| 1420 |
"grep -v ^$ | sort -u | wc -l"; |
| 1421 |
|
| 1422 |
FILE *fp = popen(magic,"r"); |
| 1423 |
ASSERT(fp); |
| 1424 |
|
| 1425 |
int rtnValue = -1; |
| 1426 |
|
| 1427 |
int res = fscanf(fp,"%d",&rtnValue); |
| 1428 |
|
| 1429 |
ASSERT(1==res); |
| 1430 |
|
| 1431 |
pclose(fp); |
| 1432 |
|
| 1433 |
ASSERT(rtnValue > 0); |
| 1434 |
return rtnValue; |
| 1435 |
} |
| 1436 |
|
| 1437 |
|
| 1438 |
|
| 1439 |
//////////////////////////////////////////////////////////////////////// |
| 1440 |
//////////////////////////////////////////////////////////////////////// |
| 1441 |
// Routines called by the mitGCM code |
| 1442 |
|
| 1443 |
|
| 1444 |
//////////////////////////////////////// |
| 1445 |
// Various "one time" initializations |
| 1446 |
void |
| 1447 |
f1( |
| 1448 |
MPI_Comm parentComm, |
| 1449 |
int numComputeRanks, |
| 1450 |
int numTiles, |
| 1451 |
MPI_Comm *rtnComputeComm) |
| 1452 |
{ |
| 1453 |
MPI_Comm newIntracomm, newIntercomm, dupParentComm; |
| 1454 |
int newIntracommRank, thisIsIORank; |
| 1455 |
int parentSize, parentRank; |
| 1456 |
int numIONodes, numIORanks; |
| 1457 |
int mpiIsInitialized, *intPtr, tagUBexists; |
| 1458 |
int i, j, nF, compRoot, fieldIndex, epochStyleIndex; |
| 1459 |
int totalNumNodes, numComputeNodes, newIntracommSize; |
| 1460 |
|
| 1461 |
// Init globals |
| 1462 |
totalNumTiles = numTiles; |
| 1463 |
if (numRanksPerNode <= 0) { |
| 1464 |
// Might also want to check for an env var (or something) |
| 1465 |
numRanksPerNode = getNumCores(); |
| 1466 |
} |
| 1467 |
|
| 1468 |
// Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors |
| 1469 |
for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) { |
| 1470 |
fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex]; |
| 1471 |
|
| 1472 |
int curFieldIndex = -1; |
| 1473 |
while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') { |
| 1474 |
i = findField(f->dataFieldID); |
| 1475 |
if (-1 != f->zDepth) { |
| 1476 |
if (f->zDepth != fieldDepths[i].numZ) { |
| 1477 |
fprintf(stderr, "Inconsistent z-depth for field '%c': " |
| 1478 |
"fieldDepths[%d] and epoch style %d\n", |
| 1479 |
f->dataFieldID, i, epochStyleIndex); |
| 1480 |
} |
| 1481 |
} |
| 1482 |
f->zDepth = fieldDepths[i].numZ; |
| 1483 |
} |
| 1484 |
} |
| 1485 |
|
| 1486 |
|
| 1487 |
// Find the max MPI "tag" value |
| 1488 |
MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists); |
| 1489 |
ASSERT(tagUBexists); |
| 1490 |
maxTagValue = *intPtr; |
| 1491 |
|
| 1492 |
|
| 1493 |
// Info about the parent communicator |
| 1494 |
MPI_Initialized(&mpiIsInitialized); |
| 1495 |
ASSERT(mpiIsInitialized); |
| 1496 |
MPI_Comm_size(parentComm, &parentSize); |
| 1497 |
MPI_Comm_rank(parentComm, &parentRank); |
| 1498 |
|
| 1499 |
|
| 1500 |
// Figure out how many nodes we can use for i/o. |
| 1501 |
// To make things (e.g. memory calculations) simpler, we want |
| 1502 |
// a node to have either all compute ranks, or all i/o ranks. |
| 1503 |
// If numComputeRanks does not evenly divide numRanksPerNode, we have |
| 1504 |
// to round up in favor of the compute side. |
| 1505 |
|
| 1506 |
totalNumNodes = divCeil(parentSize, numRanksPerNode); |
| 1507 |
numComputeNodes = divCeil(numComputeRanks, numRanksPerNode); |
| 1508 |
numIONodes = (parentSize - numComputeRanks) / numRanksPerNode; |
| 1509 |
ASSERT(numIONodes > 0); |
| 1510 |
ASSERT(numIONodes <= (totalNumNodes - numComputeNodes)); |
| 1511 |
numIORanks = numIONodes * numRanksPerNode; |
| 1512 |
|
| 1513 |
|
| 1514 |
// It is surprisingly easy to launch the job with the wrong number |
| 1515 |
// of ranks, particularly if the number of compute ranks is not a |
| 1516 |
// multiple of numRanksPerNode (the tendency is to just launch one |
| 1517 |
// rank per core for all available cores). So we introduce a third |
| 1518 |
// option for a rank: besides being an i/o rank or a compute rank, |
| 1519 |
// it might instead be an "excess" rank, in which case it just |
| 1520 |
// calls MPI_Finalize and exits. |
| 1521 |
|
| 1522 |
typedef enum { isCompute, isIO, isExcess } rankType; |
| 1523 |
rankType thisRanksType; |
| 1524 |
|
| 1525 |
if (isIORank(parentRank, totalNumNodes, numIONodes)) { |
| 1526 |
thisRanksType = isIO; |
| 1527 |
} else if (parentRank >= numComputeRanks + numIORanks) { |
| 1528 |
thisRanksType = isExcess; |
| 1529 |
} else { |
| 1530 |
thisRanksType = isCompute; |
| 1531 |
} |
| 1532 |
|
| 1533 |
// Split the parent communicator into parts |
| 1534 |
MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm); |
| 1535 |
MPI_Comm_rank(newIntracomm, &newIntracommRank); |
| 1536 |
|
| 1537 |
// Excess ranks disappear |
| 1538 |
// N.B. "parentSize" still includes these ranks. |
| 1539 |
if (isExcess == thisRanksType) { |
| 1540 |
MPI_Finalize(); |
| 1541 |
exit(0); |
| 1542 |
} |
| 1543 |
|
| 1544 |
// Sanity check |
| 1545 |
MPI_Comm_size(newIntracomm, &newIntracommSize); |
| 1546 |
if (isIO == thisRanksType) { |
| 1547 |
ASSERT(newIntracommSize == numIORanks); |
| 1548 |
} else { |
| 1549 |
ASSERT(newIntracommSize == numComputeRanks); |
| 1550 |
} |
| 1551 |
|
| 1552 |
|
| 1553 |
// Create a special intercomm from the i/o and compute parts |
| 1554 |
if (isIO == thisRanksType) { |
| 1555 |
// Set globals |
| 1556 |
ioIntracomm = newIntracomm; |
| 1557 |
MPI_Comm_rank(ioIntracomm, &ioIntracommRank); |
| 1558 |
|
| 1559 |
// Find the lowest computation rank |
| 1560 |
for (i = 0; i < parentSize; ++i) { |
| 1561 |
if (!isIORank(i, totalNumNodes, numIONodes)) break; |
| 1562 |
} |
| 1563 |
} else { // isCompute |
| 1564 |
// Set globals |
| 1565 |
computeIntracomm = newIntracomm; |
| 1566 |
MPI_Comm_rank(computeIntracomm, &computeIntracommRank); |
| 1567 |
|
| 1568 |
// Find the lowest IO rank |
| 1569 |
for (i = 0; i < parentSize; ++i) { |
| 1570 |
if (isIORank(i, totalNumNodes, numIONodes)) break; |
| 1571 |
} |
| 1572 |
} |
| 1573 |
MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm); |
| 1574 |
|
| 1575 |
|
| 1576 |
|
| 1577 |
/////////////////////////////////////////////////////////////////// |
| 1578 |
// For each different i/o epoch style, split the i/o ranks among |
| 1579 |
// the fields, and create an inter-communicator for each split. |
| 1580 |
|
| 1581 |
for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) { |
| 1582 |
fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex]; |
| 1583 |
int fieldAssignments[numIORanks]; |
| 1584 |
int rankAssignments[numComputeRanks + numIORanks]; |
| 1585 |
|
| 1586 |
// Count the number of fields in this epoch style |
| 1587 |
for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ; |
| 1588 |
|
| 1589 |
// Decide how to apportion the i/o ranks among the fields |
| 1590 |
for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1; |
| 1591 |
computeIORankAssigments(numComputeRanks, numIORanks, nF, |
| 1592 |
thisEpochStyle, fieldAssignments); |
| 1593 |
// Sanity check |
| 1594 |
for (i=0; i < numIORanks; ++i) { |
| 1595 |
ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF)); |
| 1596 |
} |
| 1597 |
|
| 1598 |
// Embed the i/o rank assignments into an array holding |
| 1599 |
// the assignments for *all* the ranks (i/o and compute). |
| 1600 |
// Rank assignment of "-1" means "compute". |
| 1601 |
j = 0; |
| 1602 |
for (i = 0; i < numComputeRanks + numIORanks; ++i) { |
| 1603 |
rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ? |
| 1604 |
fieldAssignments[j++] : -1; |
| 1605 |
} |
| 1606 |
// Sanity check. Check the assignment for this rank. |
| 1607 |
if (isIO == thisRanksType) { |
| 1608 |
ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]); |
| 1609 |
} else { |
| 1610 |
ASSERT(-1 == rankAssignments[parentRank]); |
| 1611 |
} |
| 1612 |
ASSERT(j == numIORanks); |
| 1613 |
|
| 1614 |
if (0 == parentRank) { |
| 1615 |
printf("\ncpu assignments, epoch style %d\n", epochStyleIndex); |
| 1616 |
for (i = 0; i < numComputeNodes + numIONodes ; ++i) { |
| 1617 |
if (rankAssignments[i*numRanksPerNode] >= 0) { |
| 1618 |
// i/o node |
| 1619 |
for (j = 0; j < numRanksPerNode; ++j) { |
| 1620 |
printf(" %1d", rankAssignments[i*numRanksPerNode + j]); |
| 1621 |
} |
| 1622 |
} else { |
| 1623 |
// compute node |
| 1624 |
for (j = 0; j < numRanksPerNode; ++j) { |
| 1625 |
if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break; |
| 1626 |
ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]); |
| 1627 |
} |
| 1628 |
printf(" #"); |
| 1629 |
for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any) |
| 1630 |
printf("X"); |
| 1631 |
} |
| 1632 |
} |
| 1633 |
printf(" "); |
| 1634 |
} |
| 1635 |
printf("\n\n"); |
| 1636 |
} |
| 1637 |
|
| 1638 |
// Find the lowest rank assigned to computation; use it as |
| 1639 |
// the "root" for the upcoming intercomm creates. |
| 1640 |
for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot); |
| 1641 |
// Sanity check |
| 1642 |
if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank); |
| 1643 |
|
| 1644 |
|
| 1645 |
// If this is an I/O rank, split the newIntracomm (which, for |
| 1646 |
// the i/o ranks, is a communicator among all the i/o ranks) |
| 1647 |
// into the pieces as assigned. |
| 1648 |
|
| 1649 |
if (isIO == thisRanksType) { |
| 1650 |
MPI_Comm fieldIntracomm; |
| 1651 |
int myField = rankAssignments[parentRank]; |
| 1652 |
ASSERT(myField >= 0); |
| 1653 |
|
| 1654 |
MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm); |
| 1655 |
thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm; |
| 1656 |
|
| 1657 |
// Now create an inter-communicator between the computational |
| 1658 |
// ranks, and each of the newly split off field communicators. |
| 1659 |
for (i = 0; i < nF; ++i) { |
| 1660 |
|
| 1661 |
// Do one field at a time to avoid clashes on parentComm |
| 1662 |
MPI_Barrier(newIntracomm); |
| 1663 |
|
| 1664 |
if (myField != i) continue; |
| 1665 |
|
| 1666 |
// Create the intercomm for this field for this epoch style |
| 1667 |
MPI_Intercomm_create(fieldIntracomm, 0, parentComm, |
| 1668 |
compRoot, i, &(thisEpochStyle[myField].dataIntercomm)); |
| 1669 |
|
| 1670 |
// ... and dup a separate one for tile registrations |
| 1671 |
MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm, |
| 1672 |
&(thisEpochStyle[myField].registrationIntercomm)); |
| 1673 |
} |
| 1674 |
} |
| 1675 |
else { |
| 1676 |
// This is a computational rank; create the intercommunicators |
| 1677 |
// to the various split off separate field communicators. |
| 1678 |
|
| 1679 |
for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) { |
| 1680 |
|
| 1681 |
// Find the remote "root" process for this field |
| 1682 |
int fieldRoot = -1; |
| 1683 |
while (rankAssignments[++fieldRoot] != fieldIndex); |
| 1684 |
|
| 1685 |
// Create the intercomm for this field for this epoch style |
| 1686 |
MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot, |
| 1687 |
fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm)); |
| 1688 |
|
| 1689 |
// ... and dup a separate one for tile registrations |
| 1690 |
MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm, |
| 1691 |
&(thisEpochStyle[fieldIndex].registrationIntercomm)); |
| 1692 |
} |
| 1693 |
} |
| 1694 |
|
| 1695 |
} // epoch style loop |
| 1696 |
|
| 1697 |
|
| 1698 |
// I/O processes split off and start receiving data |
| 1699 |
// NOTE: the I/O processes DO NOT RETURN from this call |
| 1700 |
if (isIO == thisRanksType) ioRankMain(totalNumTiles); |
| 1701 |
|
| 1702 |
|
| 1703 |
// The "compute" processes now return with their new intra-communicator. |
| 1704 |
*rtnComputeComm = newIntracomm; |
| 1705 |
|
| 1706 |
// but first, call mpiio initialization routine! |
| 1707 |
initSizesAndTypes(); |
| 1708 |
|
| 1709 |
return; |
| 1710 |
} |
| 1711 |
|
| 1712 |
|
| 1713 |
|
| 1714 |
|
| 1715 |
// "Register" the tile-id(s) that this process will be sending |
| 1716 |
void |
| 1717 |
f2(int tileID) |
| 1718 |
{ |
| 1719 |
int i, epochStyleIndex; |
| 1720 |
|
| 1721 |
static int count = 0; |
| 1722 |
|
| 1723 |
// This code assumes that the same tileID will apply to all the |
| 1724 |
// fields. Note that we use the tileID as a tag, so we require it |
| 1725 |
// be an int with a legal tag value, but we do *NOT* actually use |
| 1726 |
// the tag *value* for anything (e.g. it is NOT used as an index |
| 1727 |
// into an array). It is treated as an opaque handle that is |
| 1728 |
// passed from the compute task(s) to the compositing routines. |
| 1729 |
// Those two end-points likely assign meaning to the tileID (i.e. |
| 1730 |
// use it to identify where the tile belongs within the domain), |
| 1731 |
// but that is their business. |
| 1732 |
// |
| 1733 |
// A tileID of -1 signifies the end of registration. |
| 1734 |
|
| 1735 |
ASSERT(computeIntracommRank >= 0); |
| 1736 |
|
| 1737 |
if (tileID >= 0) { |
| 1738 |
// In this version of the code, in addition to the tileID, we also |
| 1739 |
// multiplex in the low order bit(s) of the epoch number as an |
| 1740 |
// error check. So the tile value must be small enough to allow that. |
| 1741 |
ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue); |
| 1742 |
|
| 1743 |
// In this version of the code, we do not send the actual tileID's |
| 1744 |
// to the i/o processes during the registration procedure. We only |
| 1745 |
// send a count of the number of tiles that we will send. |
| 1746 |
++count; |
| 1747 |
return; |
| 1748 |
} |
| 1749 |
|
| 1750 |
|
| 1751 |
// We get here when we are called with a negative tileID, signifying |
| 1752 |
// the end of registration. We now need to figure out and inform |
| 1753 |
// *each* i/o process in *each* field just how many tiles we will be |
| 1754 |
// sending them in *each* epoch style. |
| 1755 |
|
| 1756 |
for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) { |
| 1757 |
fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex]; |
| 1758 |
|
| 1759 |
int curFieldIndex = -1; |
| 1760 |
while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) { |
| 1761 |
fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]); |
| 1762 |
int numRemoteRanks, *tileCounts, remainder; |
| 1763 |
MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks); |
| 1764 |
|
| 1765 |
tileCounts = alloca(numRemoteRanks * sizeof(int)); |
| 1766 |
|
| 1767 |
memset(tileCounts,0,numRemoteRanks * sizeof(int)); |
| 1768 |
|
| 1769 |
// Distribute the tiles among the i/o ranks. |
| 1770 |
for (i = 0; i < numRemoteRanks; ++i) { |
| 1771 |
tileCounts[i] = count / numRemoteRanks; |
| 1772 |
} |
| 1773 |
// Deal with any remainder |
| 1774 |
remainder = count - ((count / numRemoteRanks) * numRemoteRanks); |
| 1775 |
for (i = 0; i < remainder; ++i) { |
| 1776 |
int target = (computeIntracommRank + i) % numRemoteRanks; |
| 1777 |
tileCounts[target] += 1; |
| 1778 |
} |
| 1779 |
|
| 1780 |
// Communicate these counts to the i/o processes for this |
| 1781 |
// field. Note that we send a message to each process, |
| 1782 |
// even if the count is zero. |
| 1783 |
for (i = 0; i < numRemoteRanks; ++i) { |
| 1784 |
MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i], |
| 1785 |
thisField->registrationIntercomm); |
| 1786 |
} |
| 1787 |
|
| 1788 |
} // field loop |
| 1789 |
} // epoch-style loop |
| 1790 |
|
| 1791 |
} |
| 1792 |
|
| 1793 |
|
| 1794 |
|
| 1795 |
|
| 1796 |
int currentEpochID = 0; |
| 1797 |
int currentEpochStyle = 0; |
| 1798 |
|
| 1799 |
void |
| 1800 |
beginNewEpoch(int newEpochID, int gcmIter, int epochStyle) |
| 1801 |
{ |
| 1802 |
fieldInfoThisEpoch_t *p; |
| 1803 |
|
| 1804 |
if (newEpochID != (currentEpochID + 1)) { |
| 1805 |
fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n", |
| 1806 |
currentEpochID, newEpochID); |
| 1807 |
sleep(1); // Give the message a chance to propagate |
| 1808 |
abort(); |
| 1809 |
} |
| 1810 |
|
| 1811 |
//////////////////////////////////////////////////////////////////////// |
| 1812 |
// Need to be sure the i/o tasks are done processing the previous epoch |
| 1813 |
// before any compute tasks start sending tiles from a new epoch. |
| 1814 |
|
| 1815 |
if (0 == computeIntracommRank) { |
| 1816 |
int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter }; |
| 1817 |
|
| 1818 |
// Wait to get an ack that the i/o tasks are done with the prior epoch. |
| 1819 |
MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, |
| 1820 |
globalIntercomm, MPI_STATUS_IGNORE); |
| 1821 |
|
| 1822 |
// Tell the i/o ranks about the new epoch. |
| 1823 |
// (Just tell rank 0; it will bcast to the others) |
| 1824 |
MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm); |
| 1825 |
} |
| 1826 |
|
| 1827 |
// Compute ranks wait here until rank 0 gets the ack from the i/o ranks |
| 1828 |
MPI_Barrier(computeIntracomm); |
| 1829 |
|
| 1830 |
|
| 1831 |
currentEpochID = newEpochID; |
| 1832 |
currentEpochStyle = epochStyle; |
| 1833 |
|
| 1834 |
// Reset the tileCount (used by f3) |
| 1835 |
for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) { |
| 1836 |
p->tileCount = 0; |
| 1837 |
} |
| 1838 |
} |
| 1839 |
|
| 1840 |
|
| 1841 |
void |
| 1842 |
f3(char dataFieldID, int tileID, int epochID, void *data) |
| 1843 |
{ |
| 1844 |
int whichField, receiver, tag; |
| 1845 |
|
| 1846 |
static char priorDataFieldID = '\0'; |
| 1847 |
static int priorEpoch = -1; |
| 1848 |
static int remoteCommSize; |
| 1849 |
static fieldInfoThisEpoch_t *p; |
| 1850 |
|
| 1851 |
int flag=0; |
| 1852 |
|
| 1853 |
// Check that this global has been set |
| 1854 |
ASSERT(computeIntracommRank >= 0); |
| 1855 |
|
| 1856 |
|
| 1857 |
// Figure out some info about this dataFieldID. It is |
| 1858 |
// likely to be another tile from the same field as last time |
| 1859 |
// we were called, in which case we can reuse the "static" values |
| 1860 |
// retained from the prior call. If not, we need to look it up. |
| 1861 |
|
| 1862 |
if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) { |
| 1863 |
|
| 1864 |
// It's not the same; we need to look it up. |
| 1865 |
|
| 1866 |
for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) { |
| 1867 |
if (p->dataFieldID == dataFieldID) break; |
| 1868 |
} |
| 1869 |
// Make sure we found a valid field |
| 1870 |
|
| 1871 |
ASSERT(p->dataIntercomm != MPI_COMM_NULL); |
| 1872 |
|
| 1873 |
MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize); |
| 1874 |
|
| 1875 |
flag=1; |
| 1876 |
|
| 1877 |
} |
| 1878 |
|
| 1879 |
ASSERT(p->dataFieldID == dataFieldID); |
| 1880 |
|
| 1881 |
receiver = (computeIntracommRank + p->tileCount) % remoteCommSize; |
| 1882 |
|
| 1883 |
tag = (tileID << numCheckBits) | (epochID & bitMask); |
| 1884 |
|
| 1885 |
//fprintf(stderr,"%d %d\n",flag,tileID); |
| 1886 |
|
| 1887 |
/* |
| 1888 |
if (tileID==189){ |
| 1889 |
int i,j; |
| 1890 |
for (i=0;i<TILE_Y;++i){ |
| 1891 |
for (j=0;j<TILE_X;++j) |
| 1892 |
fprintf(stderr,"%f ",((double*)data)[872+i*108+j]); |
| 1893 |
fprintf(stderr,"\n"); |
| 1894 |
} |
| 1895 |
} |
| 1896 |
|
| 1897 |
*/ |
| 1898 |
|
| 1899 |
|
| 1900 |
|
| 1901 |
FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n", |
| 1902 |
computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits); |
| 1903 |
|
| 1904 |
MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth, |
| 1905 |
MPI_BYTE, receiver, tag, p->dataIntercomm); |
| 1906 |
|
| 1907 |
p->tileCount += 1; |
| 1908 |
priorDataFieldID = dataFieldID; |
| 1909 |
priorEpoch = epochID; |
| 1910 |
} |
| 1911 |
|
| 1912 |
|
| 1913 |
|
| 1914 |
void |
| 1915 |
f4(int epochID) |
| 1916 |
{ |
| 1917 |
int i; |
| 1918 |
ASSERT(computeIntracommRank >= 0); |
| 1919 |
|
| 1920 |
if (0 == computeIntracommRank) { |
| 1921 |
int cmd[3] = { cmd_exit, -1, -1 }; |
| 1922 |
|
| 1923 |
// Recv the ack that the prior i/o epoch is complete |
| 1924 |
MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, |
| 1925 |
globalIntercomm, MPI_STATUS_IGNORE); |
| 1926 |
|
| 1927 |
// Tell the i/o ranks to exit |
| 1928 |
// Just tell rank 0; it will bcast to the others |
| 1929 |
MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm); |
| 1930 |
} |
| 1931 |
|
| 1932 |
} |
| 1933 |
|
| 1934 |
|
| 1935 |
|
| 1936 |
void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz, |
| 1937 |
int *nt, int *nb, |
| 1938 |
int *tx, int *ty, |
| 1939 |
int *ox, int *oy) |
| 1940 |
{ |
| 1941 |
|
| 1942 |
|
| 1943 |
int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy}; |
| 1944 |
|
| 1945 |
int items = sizeof(data)/sizeof(int); |
| 1946 |
|
| 1947 |
|
| 1948 |
NTILES = *nt; // total number of tiles |
| 1949 |
|
| 1950 |
|
| 1951 |
TILE_X = *tx; |
| 1952 |
TILE_Y = *ty; |
| 1953 |
XGHOSTS = *ox; |
| 1954 |
YGHOSTS = *oy; |
| 1955 |
tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS)); |
| 1956 |
tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3 |
| 1957 |
|
| 1958 |
|
| 1959 |
int rank,ierr; |
| 1960 |
MPI_Comm_rank(globalIntercomm,&rank); |
| 1961 |
|
| 1962 |
|
| 1963 |
if (0==rank) |
| 1964 |
printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy); |
| 1965 |
|
| 1966 |
|
| 1967 |
/* |
| 1968 |
if (0==rank) |
| 1969 |
ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm); |
| 1970 |
else |
| 1971 |
ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm); |
| 1972 |
*/ |
| 1973 |
|
| 1974 |
if (0==rank) |
| 1975 |
ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm); |
| 1976 |
else |
| 1977 |
ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm); |
| 1978 |
|
| 1979 |
} |
| 1980 |
|
| 1981 |
void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip) |
| 1982 |
{ |
| 1983 |
int rank,ierr; |
| 1984 |
MPI_Comm_rank(globalIntercomm,&rank); |
| 1985 |
|
| 1986 |
if (0==rank) |
| 1987 |
ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm); |
| 1988 |
else |
| 1989 |
ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm); |
| 1990 |
|
| 1991 |
if (0==rank) |
| 1992 |
ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm); |
| 1993 |
else |
| 1994 |
ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm); |
| 1995 |
|
| 1996 |
if (0==rank) |
| 1997 |
ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm); |
| 1998 |
else |
| 1999 |
ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm); |
| 2000 |
|
| 2001 |
} |
| 2002 |
|
| 2003 |
|
| 2004 |
|
| 2005 |
|
| 2006 |
|
| 2007 |
////////////////////////////////////////////////////////////////////// |
| 2008 |
// Fortran interface |
| 2009 |
|
| 2010 |
void |
| 2011 |
bron_f1( |
| 2012 |
MPI_Fint *ptr_parentComm, |
| 2013 |
int *ptr_numComputeRanks, |
| 2014 |
int *ptr_totalNumTiles, |
| 2015 |
MPI_Fint *rtnComputeComm) |
| 2016 |
{ |
| 2017 |
// Convert the Fortran handle into a C handle |
| 2018 |
MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm); |
| 2019 |
|
| 2020 |
f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm); |
| 2021 |
|
| 2022 |
// Convert the C handle into a Fortran handle |
| 2023 |
*rtnComputeComm = MPI_Comm_c2f(newComm); |
| 2024 |
} |
| 2025 |
|
| 2026 |
|
| 2027 |
|
| 2028 |
void |
| 2029 |
bron_f2(int *ptr_tileID) |
| 2030 |
{ |
| 2031 |
f2(*ptr_tileID); |
| 2032 |
} |
| 2033 |
|
| 2034 |
|
| 2035 |
void |
| 2036 |
beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle) |
| 2037 |
{ |
| 2038 |
beginNewEpoch(*newEpochID, *gcmIter, *epochStyle); |
| 2039 |
} |
| 2040 |
|
| 2041 |
|
| 2042 |
void |
| 2043 |
bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data) |
| 2044 |
{ |
| 2045 |
f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data); |
| 2046 |
} |
| 2047 |
|
| 2048 |
|
| 2049 |
|
| 2050 |
void |
| 2051 |
bron_f4(int *ptr_epochID) |
| 2052 |
{ |
| 2053 |
f4(*ptr_epochID); |
| 2054 |
} |
| 2055 |
|