PlasCom2  1.0
XPACC Multi-physics simluation application
COMM.C
Go to the documentation of this file.
1 #include "COMM.H"
7 
8 namespace ix {
9  namespace comm {
10  int MobileObject::PrepareBuffer(size_t bsize)
11  {
12  if(_buf && _mine)
13  delete [] (char *)_buf;
14  _buf = new char [bsize];
15  if(!_buf)
16  return 0;
17  _bsize = bsize;
18  _mine = true;
19  return _bsize;
20  };
21 
22  int MobileObject::Pack(void **inbuf)
23  {
24  if(_buf && _mine){
25  delete [] (char *)_buf;
26  _buf = NULL;
27  _mine = false;
28  }
29  std::ostringstream Ostr;
30  // Ostr << *this;
31  if(!inbuf){
32  int nchar = util::String2Buf(Ostr.str(),&_buf);
33  if(_buf){
34  _mine = true;
35  _bsize = nchar;
36  return(_bsize);
37  }
38  }
39  else {
40  // Caller must free inbuf, else it leaks
41  int nchar = util::String2Buf(Ostr.str(),inbuf);
42  if(*inbuf){
43  _buf = *inbuf;
44  _mine = false;
45  _bsize = nchar;
46  return(_bsize);
47  }
48  }
49  return 0;
50  };
51 
52  int MobileObject::UnPack(const void *inbuf)
53  {
54  if(!inbuf){
55  if(!_buf)
56  return 1;
57  std::istringstream Istr(std::string((const char *)_buf,(const char *)_buf+_bsize));
58  // Istr >> *this;
59  if(_mine){
60  delete [] (char *)_buf;
61  _buf = NULL;
62  _mine = false;
63  _bsize = 0;
64  }
65  return(0);
66  }
67  else{
68  std::istringstream Istr(std::string((const char *)inbuf));
69  // Istr >> *this;
70  return(0);
71  }
72  return 0;
73  };
74 
75  CommunicatorObject::CommunicatorObject(const CommunicatorObject &incomm) : IntegerTypeID(MPI_INTEGER)
76 
77  {
78  _rank = -1;
79  int mpiInitd = 0;
80  MPI_Initialized(&mpiInitd);
81  if(mpiInitd){
82  MPI_Comm_dup(incomm.GetCommunicator(),&_comm);
83  MPI_Comm_rank(GetCommunicator(),&_rank);
84  MPI_Comm_size(GetCommunicator(),&_nproc);
85  }
86  _master = false;
87  _own_comm = true;
88  _initd = true;
89  _rc = 0;
90  _error = 0;
91  };
92 
94 
95  {
96  _rank = -1;
97  _comm = MPI_COMM_WORLD;
98  _master = false;
99  _own_comm = false;
100  _initd = false;
101  _rc = 0;
102  _nproc = 0;
103  _error = 0;
104  };
105  CommunicatorObject::CommunicatorObject(MPI_Comm &incomm) : IntegerTypeID(MPI_INTEGER)
106  {
107  _error = 0;
108  _rc = MPI_Comm_rank(incomm,&_rank);
109  if(_rc != MPI_SUCCESS)
110  _error = _rc;
111  _comm = incomm;
112  _master = false;
113  _own_comm = false;
114  _initd = true;
115  _rc = MPI_Comm_size(_comm,&_nproc);
116  if(_rc != MPI_SUCCESS)
117  _error = _rc;
118  };
119 
120  CommunicatorObject::CommunicatorObject(int* narg,char*** args) : IntegerTypeID(MPI_INTEGER)
121  {
122  _nproc = 0;
123  _rc = Initialize(narg,args);
124  assert(_rc == 0);
125  _rank = -1;
126  _error = 0;
127  _own_comm = false;
128  _master = true;
129  _initd = true;
130  };
132  {
133  int newRank = 0;
134  int newSize = 0;
135  int retVal = MPI_Comm_rank(inComm,&newRank);
136  if(!retVal)
137  retVal = MPI_Comm_size(inComm,&newSize);
138  if(retVal)
139  return(retVal);
140  ClearRequests();
141  ClearErr();
142  _comm = inComm;
143  _own_comm = false;
144  _master = false;
145  _initd = true;
146  _rank = newRank;
147  _nproc = newSize;
148  _rc = 0;
149  return(0);
150  };
151  int CommunicatorObject::Split(int color,int key,CommunicatorObject &newcomm)
152  {
153  _rc = MPI_Comm_split(_comm,color,key,&(newcomm._comm));
154  if(_rc)
155  return(_rc);
156  newcomm._master = false;
157  newcomm._own_comm = true;
158  newcomm._initd = true;
159  _rc = MPI_Comm_rank(newcomm._comm,&(newcomm._rank));
160  if(_rc)
161  return(_rc);
162  _rc = MPI_Comm_size(newcomm._comm,&(newcomm._nproc));
163  return(_rc);
164  };
165 
166  int CommunicatorObject::ComputeCartesianDims(int numNodes,int numDims)
167  {
168  if(_cart_dims.size() != numDims)
169  _cart_dims.resize(numDims,0);
170  return(MPI_Dims_create(numNodes,numDims,&_cart_dims[0]));
171  };
172 
173  int CommunicatorObject::CartNeighbors(std::vector<int> &neighborRanks)
174  {
175  if(_cart_dims.empty())
176  return(1);
177  int topoType = MPI_UNDEFINED;
178  MPI_Topo_test(_comm,&topoType);
179  if(topoType != MPI_CART)
180  return(1);
181  int retVal = 0;
182  int numDims = _cart_dims.size();
183  neighborRanks.resize(numDims*2,-1);
184  int neighborPlus = MPI_PROC_NULL;
185  int neighborMinus = MPI_PROC_NULL;
186  for(int iDim = 0;((iDim < numDims) && !retVal);iDim++){
187  retVal = MPI_Cart_shift(_comm,iDim,1,&neighborMinus,&neighborPlus);
188  if(neighborMinus != MPI_PROC_NULL)
189  neighborRanks[iDim*2] = neighborMinus;
190  if(neighborPlus != MPI_PROC_NULL)
191  neighborRanks[iDim*2+1] = neighborPlus;
192  }
193  return(retVal);
194  };
195 
197  char hostName[MPI_MAX_PROCESSOR_NAME];
198  int hostLen = 0;
199  std::string retVal;
200  MPI_Get_processor_name(hostName,&hostLen);
201  if(hostLen > 0)
202  retVal.assign(hostName,hostLen);
203  return(retVal);
204  };
205 
206  int
207  CommunicatorObject::InitializeCartesianTopology(int numNodes,int numDims,std::vector<int> &dimDir,
208  const std::vector<int> &isPeriodic,bool reOrder,
209  CommunicatorObject &cartComm)
210  {
211  cartComm._cart_coords.resize(numDims,0);
212  cartComm._cart_dims.resize(numDims,0);
213  cartComm._cart_periodic.resize(numDims,0);
214  bool autoDetermineSize = false;
215  if(dimDir.size() != numDims)
216  autoDetermineSize = true;
217  int nProcCart = 1;
218  std::vector<int>::iterator dimDirIt = dimDir.begin();
219  while(dimDirIt != dimDir.end())
220  nProcCart *= *dimDirIt++;
221  if(nProcCart <= 0)
222  autoDetermineSize = true;
223  if(autoDetermineSize){
224  dimDir.resize(numDims,0);
225  if(cartComm.ComputeCartesianDims(numNodes,numDims) != MPI_SUCCESS)
226  return(1);
227  dimDir = cartComm._cart_dims;
228  } else {
229  cartComm._cart_dims = dimDir;
230  }
231  if(isPeriodic.size() == numDims){
232  cartComm._cart_periodic = isPeriodic;
233  }
234  cartComm._cart_comm = MPI_COMM_NULL;
235  _cart_comm = MPI_COMM_NULL;
236  if(MPI_Cart_create(_comm,numDims,&(cartComm._cart_dims[0]),&(cartComm._cart_periodic[0]),
237  reOrder,&_cart_comm) != MPI_SUCCESS){
238  _cart_comm = MPI_COMM_NULL;
239  cartComm._cart_coords.resize(0);
240  cartComm._cart_dims.resize(0);
241  cartComm._cart_periodic.resize(0);
242  return(1);
243  }
244  int tempRank = 0;
245  MPI_Comm_rank(_cart_comm,&tempRank);
246  if(MPI_Cart_coords(_cart_comm,tempRank,numDims,&(cartComm._cart_coords[0])) != MPI_SUCCESS){
247  return(1);
248  }
249  if(MPI_Cart_rank(_cart_comm,&(cartComm._cart_coords[0]),&(cartComm._cart_rank)) != MPI_SUCCESS){
250  // std::cout << "tempRank/cartrank (" << tempRank << "/" << _cart_rank
251  // << ")" << std::endl;
252  return(1);
253  }
254  return(cartComm.RenewCommunicator(_cart_comm));
255  };
256 
258  {
259  _status.resize(1);
260  _rc = MPI_Wait(&_send_requests[recvid],&_status[0]);
261  assert(_rc == 0);
262  return(_rc);
263  };
264 
265  // clear up any persistent requests
267  {
268  std::vector<MPI_Request> requests;
269  std::vector<MPI_Request>::iterator ri = _send_requests.begin();
270  while(ri != _send_requests.end())
271  requests.push_back(*ri++);
272  ri = _recv_requests.begin();
273  while(ri != _recv_requests.end())
274  requests.push_back(*ri++);
275  int count = requests.size();
276  _status.resize(count);
277  _rc = MPI_Waitall(count,&requests[0],&_status[0]);
278  assert(_rc == 0);
279  ri = requests.begin();
280  while(ri != requests.end()){
281  // assert(*ri++ == MPI_REQUEST_NULL);
282  if(*ri != MPI_REQUEST_NULL)
283  assert(_status[ri-requests.begin()].MPI_ERROR == 0);
284  ri++;
285  }
286  ClearRequests();
287  return(_rc);
288  };
289 
290  // clear up any persistent requests
292  {
293  std::vector<MPI_Request>::iterator ri = _send_requests.begin();
294  // while(ri != _send_requests.end())
295  // MPI_Request_free(&(*ri++));
296  // ri = _recv_requests.begin();
297  // while(ri != _recv_requests.end())
298  // MPI_Request_free(&(*ri++));
299  _send_requests.resize(0);
300  _recv_requests.resize(0);
301  _send_tags.resize(0);
302  _recv_tags.resize(0);
303  };
304 
305  // int StartSend(unsigned int rid);
306  // int SendAll();
307  // int StartRecv(unsigned int rid);
308  // int RecvAll();
309 
311  {
312  _comm = incomm._comm;
313  _master = false;
314  _own_comm = false;
315  _initd = true;
316  int flag = 0;
317  MPI_Initialized(&flag);
318  assert(flag != 0);
319  if(flag == 0)
320  _initd = false;
321  else{
322  _rc = MPI_Comm_rank(_comm,&_rank);
323  _rc = MPI_Comm_size(_comm,&_nproc);
324  }
325  return(0);
326  };
327 
328  int CommunicatorObject::Initialize(int* narg,char*** args)
329  {
330  int flag = 0;
331  MPI_Initialized(&flag);
332  _comm = MPI_COMM_WORLD;
333  _master = false;
334  _initd = true;
335  _rc = 0;
336  if(flag == 0){
337  _master = true;
338  _own_comm = false;
339  int provided;
340 
341  // Make the MPI runtime aware that we might want to run OpenMP threads,
342  // but will do MPI calls only from the master thread.
343  _rc = MPI_Init_thread(narg, args, MPI_THREAD_FUNNELED, &provided);
344  assert(provided == MPI_THREAD_FUNNELED);
345 
346  // As HDF5 might run MPI functions as part of its shutdown, we can not call
347  // MPI_Finalize directly.
348  atexit((void (*)())MPI_Finalize);
349  }
350  if(_rc == MPI_SUCCESS)
351  _rc = MPI_Comm_size(_comm,&_nproc);
352  return(_rc);
353  };
354 
356  {
357  int errcheck = 0;
358  // i guess for now we'll reduce max and make sure it's zero
359  _rc = MPI_Allreduce(&_error,&errcheck,1,MPI_INTEGER,ResolveOp(op),_comm);
360  return(errcheck);
361  };
362 
364  {
365  if(_rank < 0)
366  MPI_Comm_rank(_comm,&_rank);
367  return(_rank);
368  };
369 
371  {
372  // MPI is finalized by the atexit() call installed in ::Initialize.
373  _initd = false;
374  return(0);
375  };
377  {
378  if(_nproc <= 0)
379  _rc = MPI_Comm_size(_comm,&_nproc);
380  return(_nproc);
381  };
382 
384  {
385  int flag;
386  int mpiInitd = 0;
387  MPI_Initialized(&mpiInitd);
388  MPI_Finalized(&flag);
389  if(flag == 0 && _own_comm && _initd && mpiInitd){
390  MPI_Comm_free(&_comm);
391  }
392  // MPI is finalized by the atexit() call installed in ::Initialize.
393  };
394 
395  int CommunicatorObject::StartSend(unsigned int rid)
396  {
397  if(_send_requests.empty())
398  return(0);
399  if(rid >= _send_requests.size()){
400  _error = 4;
401  return(1);
402  }
403  _rc = MPI_Start(&_send_requests[rid]);
404  assert(_rc == 0);
405  return(_rc);
406  };
407 
409  {
410  if(_send_requests.empty())
411  return(0);
412  _rc = MPI_Startall(_send_requests.size(),&_send_requests[0]);
413  assert(_rc == 0);
414  return(_rc);
415  };
416 
417  int CommunicatorObject::StartRecv(unsigned int rid)
418  {
419  if(_recv_requests.empty())
420  return(0);
421  if(rid >= _recv_requests.size())
422  return(1);
423  _rc = MPI_Start(&_recv_requests[rid]);
424  assert(_rc == 0);
425  return(_rc);
426  };
427 
429  {
430  if(_recv_requests.empty())
431  return(0);
432  _rc = MPI_Startall(_recv_requests.size(),&_recv_requests[0]);
433  assert(_rc == 0);
434  return(_rc);
435  };
436 
437  int CommunicatorObject::BroadCast(std::string &sval,int root_rank)
438  {
439  int sizeofobject = 0;
440  if(_rank == root_rank)
441  sizeofobject = sval.size();
442  _rc = MPI_Bcast(&sizeofobject,1,MPI_INT,root_rank,_comm);
443  if(sizeofobject <= 0)
444  return 1;
445  if(!_rc){
446  char *sbuf = new char [sizeofobject+1];
447  if(_rank == root_rank)
448  std::strncpy(sbuf,sval.c_str(),sizeofobject);
449  _rc = MPI_Bcast(sbuf,sizeofobject,MPI_CHAR,root_rank,_comm);
450  if(_rank != root_rank){
451  sbuf[sizeofobject] = '\0';
452  sval.assign(std::string(sbuf));
453  }
454  delete [] sbuf;
455  }
456  return(_rc);
457  };
458 
459 
460  int CommunicatorObject::_ASend(void *buf,int sendsize,
461  unsigned int remote_rank,int tag)
462  {
463  MPI_Request request;
464  int request_id = _send_requests.size();
465  _send_requests.push_back(request);
466  if(tag == -1)
467  tag = _send_tags.size() + 1;
468  _send_tags.push_back(tag);
469  _rc = MPI_Isend(buf,sendsize,MPI_CHAR,remote_rank,
470  tag,_comm,&_send_requests[request_id]);
471  assert(_rc == 0);
472  return(request_id);
473  };
474 
475  int CommunicatorObject::_Send(void *buf,int sendsize,
476  unsigned int remote_rank,int tag)
477  {
478  _rc = MPI_Send(buf,sendsize,MPI_CHAR,remote_rank,
479  tag,_comm);
480  assert(_rc == 0);
481  return(0);
482  };
483 
484  int CommunicatorObject::_SetRecv(void *recvbuf,int recvsize,
485  unsigned int remote_rank,int tag)
486  {
487  MPI_Request request;
488  int request_id = _recv_requests.size();
489  _recv_requests.push_back(request);
490  if(tag == 0)
491  tag = MPI_ANY_TAG;
492  _recv_tags.push_back(tag);
493  _rc = MPI_Recv_init(recvbuf,recvsize,MPI_CHAR,remote_rank,
494  tag,_comm,&_recv_requests[request_id]);
495  assert(_rc == 0);
496  return(request_id);
497  };
498 
499 
500  int CommunicatorObject::_ARecv(void *recvbuf,int recvsize,
501  unsigned int remote_rank,int tag)
502  {
503  MPI_Request request;
504  int request_id = _recv_requests.size();
505  _recv_requests.push_back(request);
506  if(tag == 0)
507  tag = MPI_ANY_TAG;
508  _recv_tags.push_back(tag);
509  _rc = MPI_Irecv(recvbuf,recvsize,MPI_CHAR,remote_rank,
510  tag,_comm,&_recv_requests[request_id]);
511  assert(_rc == 0);
512  return(request_id);
513  };
514 
515  int CommunicatorObject::_Recv(void *recvbuf,int recvsize,
516  unsigned int remote_rank,int tag)
517  {
518  MPI_Status status;
519  if(tag == 0)
520  tag = MPI_ANY_TAG;
521  _rc = MPI_Recv(recvbuf,recvsize,MPI_CHAR,remote_rank,
522  tag,_comm,&status);
523  assert(_rc == 0);
524  return(0);
525  };
526 
527  int CommunicatorObject::_SetSend(void *sendbuf,int sendsize,
528  unsigned int remote_rank,int tag)
529  {
530  MPI_Request request;
531  int request_id = _send_requests.size();
532  _send_requests.push_back(request);
533  if(tag == 0)
534  tag = _send_tags.size() + 1;
535  _send_tags.push_back(tag);
536  _rc = MPI_Send_init(sendbuf,sendsize,MPI_CHAR,remote_rank,
537  tag,_comm,&_send_requests[request_id]);
538  assert(_rc == 0);
539  return(request_id);
540  };
541 
542  int CommunicatorObject::_AllGatherv(void *sendbuf,int mysendcnt,int datasize,void *recvbuf)
543  {
544  std::vector<int> allsizes(_nproc,0);
545  AllGather(mysendcnt,allsizes);
546  std::vector<int>::iterator asi = allsizes.begin();
547  while(asi != allsizes.end()){
548  *asi = *asi*datasize;
549  asi++;
550  }
551  std::vector<int> displacements(_nproc,0);
552  for(int i = 1;i < _nproc;i++)
553  displacements[i] = displacements[i-1]+allsizes[i-1];
554  _rc = MPI_Allgatherv(sendbuf,allsizes[_rank],MPI_CHAR,
555  recvbuf,&allsizes[0],&displacements[0],
556  MPI_CHAR,_comm);
557  assert(_rc == 0);
558  return(_rc);
559  };
560 
562  {
563  int sizeofobject = 0;
564  if(_rank == root_rank){
565  sizeofobject = mo->Pack();
566  }
567  _rc = MPI_Bcast(&sizeofobject,1,MPI_INT,root_rank,_comm);
568  if(sizeofobject <= 0)
569  return 1;
570  if(!_rc){
571  if(_rank != root_rank)
572  mo->PrepareBuffer(sizeofobject);
573  _rc = MPI_Bcast(mo->GetBuffer(),sizeofobject,MPI_CHAR,root_rank,_comm);
574  if(_rank != root_rank){
575  _rc = mo->UnPack();
576  }
577  }
578  return(_rc);
579  };
580 
581  int CommunicatorObject::_GatherMO(MobileObject *sPtr,std::vector<MobileObject *> &rVec,int sndcnt,int root)
582  {
583  // int recvcnt = sndcnt;
584  int local_size = 0;
585  local_size = sPtr->Pack();
586  std::vector<int> allsizes;
587  std::vector<int> disps;
588  if(_rank == root){
589  allsizes.resize(_nproc,0);
590  disps.resize(_nproc,0);
591  allsizes[_rank] = local_size;
592  }
593  this->Gather(local_size,allsizes,root);
594  // this->Barrier();
595  int total_size = 0;
596  if(_rank == root){
597  for(int i = 0; i < _nproc;i++){
598  total_size += allsizes[i];
599  if(i > 0)
600  disps[i] = disps[i-1]+allsizes[i-1];
601  // total_nitems += allnitems[i];
602  }
603  }
604  char *recvbuffer = NULL;
605  if(_rank == root)
606  recvbuffer = new char [total_size];
607 
608  _rc = MPI_Gatherv((void *)(sPtr->GetBuffer()),local_size,MPI_CHAR,
609  (void *)(recvbuffer),&allsizes[0],&disps[0],MPI_CHAR,
610  root,_comm);
611  // All the send buffers can be destroyed
612  // if(_rank != root)
613  // sPtr->DestroyBuffer();
614  // The root must unpack the received data:
615  if(_rank == root){
616  for(int i = 0;i < _nproc;i++){
617  if(i == root)
618  rVec[i]->UnPack(sPtr->GetBuffer());
619  else
620  rVec[i]->UnPack(&recvbuffer[disps[i]]);
621  }
622  delete [] recvbuffer;
623  }
624  sPtr->DestroyBuffer();
625  return(_rc);
626  };
627 
628  // Note, the mos have to be of the right size on every processor - otherwise we
629  // have no way to generically size an array of unspecified (actual) type.
630  int CommunicatorObject::_BroadCastMOV(std::vector<MobileObject *> &mos,int root_rank)
631  {
632  int nobjs = mos.size();
633  _rc = 0;
634  // This broadcast is superfluous since nobjs should already be
635  // identical on every processor.
636  if((_rc = MPI_Bcast(&nobjs,1,MPI_INT,root_rank,_comm)))
637  return(1);
638  std::vector<int> sizeofobject(nobjs,0);
639  int total_size = 0;
640  if(_rank == root_rank){
641  std::vector<int>::iterator si = sizeofobject.begin();
642  std::vector<MobileObject *>::iterator moi = mos.begin();
643  while(moi != mos.end()){
644  sizeofobject[si - sizeofobject.begin()] = (*moi)->Pack();
645  moi++;
646  si++;
647  }
648  }
649  if((_rc = MPI_Bcast(&sizeofobject[0],nobjs,MPI_INT,root_rank,_comm)))
650  return(1);
651  std::vector<int>::iterator si = sizeofobject.begin();
652  std::vector<MobileObject *>::iterator moi = mos.begin();
653  while(si != sizeofobject.end()){
654  if(_rank != root_rank){
655  int bsize = (*moi++)->PrepareBuffer(*si);
656  assert(bsize == *si);
657  }
658  total_size += *si++;
659  }
660  assert(total_size > 0);
661  if(total_size <= 0)
662  return(1);
663  char *bufferspace = new char [total_size];
664  if(_rank == root_rank){
665  // pack the bufferspace with all the buffers
666  char *cur_pos = bufferspace;
667  si = sizeofobject.begin();
668  moi = mos.begin();
669  while(moi != mos.end()){
670  std::memcpy(cur_pos,(*moi)->GetBuffer(),*si);
671  cur_pos += *si++;
672  moi++;
673  }
674  }
675  if((_rc = MPI_Bcast(bufferspace,total_size,MPI_CHAR,root_rank,_comm))){
676  delete [] bufferspace;
677  return(1);
678  }
679 
680  // Now everyone has all the data
681  if(_rank == root_rank){
682  // root can just destroy his buffers
683  moi = mos.begin();
684  while(moi != mos.end())
685  (*moi++)->DestroyBuffer();
686  }
687  else{ // everyone else needs to unpack
688  char *cur_pos = bufferspace;
689  si = sizeofobject.begin();
690  moi = mos.begin();
691  while(moi != mos.end()){
692  assert((*moi)->GetBuffer() != NULL);
693  std::memcpy((*moi)->GetBuffer(),cur_pos,*si);
694  _rc += (*moi)->UnPack();
695  assert(_rc == 0);
696  cur_pos += *si++;
697  moi++;
698  }
699  }
700  delete [] bufferspace;
701  return(_rc);
702  };
703 
704  int CommunicatorObject::_AllGatherMO(MobileObject *sPtr,std::vector<MobileObject *> &rVec,int sndcnt)
705  {
706  // int recvcnt = sndcnt;
707  int local_size = 0;
708  local_size = sPtr->Pack();
709  std::vector<int> allsizes(_nproc,0);
710  std::vector<int> disps(_nproc,0);
711  allsizes[_rank] = local_size;
712  this->AllGather(local_size,allsizes);
713  int total_size = 0;
714  for(int i = 0; i < _nproc;i++){
715  total_size += allsizes[i];
716  if(i > 0)
717  disps[i] = disps[i-1]+allsizes[i-1];
718  // total_nitems += allnitems[i];
719  }
720  char *recvbuffer = new char [total_size];
721  _rc = MPI_Allgatherv((void *)(sPtr->GetBuffer()),local_size,MPI_CHAR,
722  (void *)(recvbuffer),&allsizes[0],&disps[0],MPI_CHAR,_comm);
723  // The root must unpack the received data:
724  for(int i = 0;i < _nproc;i++){
725  if(i == _rank)
726  rVec[i]->UnPack(sPtr->GetBuffer());
727  else
728  rVec[i]->UnPack(&recvbuffer[disps[i]]);
729  }
730  delete [] recvbuffer;
731  sPtr->DestroyBuffer();
732  return(_rc);
733  };
734 
735  int CommunicatorObject::_GatherMOV(std::vector<MobileObject *> &sVec,std::vector<MobileObject *> &rVec,
736  std::vector<int> &nsend_all,int root)
737  {
738  int sndIcnt = sVec.size();
739  int local_size = 0;
740  std::vector<int> local_sizes(sndIcnt,0);
741  for(int i = 0;i < sndIcnt;i++){
742  local_sizes[i] = sVec[i]->Pack();
743  local_size += local_sizes[i];
744  }
745  std::vector<int> Allsizes;
746  this->Gatherv(local_sizes,Allsizes,nsend_all,root);
747  int total_size = 0;
748  std::vector<int> allsizes(_nproc,0);
749  std::vector<int> disps(_nproc,0);
750  int sindex = 0;
751  for(int i = 0; i < _nproc; i++){
752  for(int j = 0; j < nsend_all[i]; j++){
753  allsizes[i] += Allsizes[sindex];
754  }
755  total_size += allsizes[i];
756  if(i > 0)
757  disps[i] = disps[i-1]+allsizes[i-1];
758  }
759  char *sendbuffer = new char [local_size];
760  char *recvbuffer = NULL;
761  if(_rank == root)
762  recvbuffer = new char [total_size];
763  // std::vector<MobileObject *>::iterator sVit = sVec.begin();
764  char *cur_pos = sendbuffer;
765  for(int i = 0;i < sndIcnt;i++){
766  std::memcpy(cur_pos,sVec[i]->GetBuffer(),sVec[i]->BufSize());
767  cur_pos += sVec[i]->BufSize();
768  }
769  _rc = MPI_Gatherv((void *)sendbuffer,local_size,MPI_CHAR,
770  (void *)recvbuffer,&allsizes[0],&disps[0],MPI_CHAR,
771  root,_comm);
772  cur_pos = recvbuffer;
773  // unpack each object
774  int rind = 0;
775  int sind = 0;
776  if(_rank == root){
777  for(int i = 0;i < _nproc;i++){
778  for(int j = 0;j < nsend_all[i];j++){
779  if(i == root){
780  rVec[rind++]->UnPack(sendbuffer);
781  sendbuffer += local_sizes[j];
782  cur_pos += local_sizes[j];
783  sind++;
784  }
785  else {
786  rVec[rind++]->UnPack(cur_pos);
787  cur_pos += Allsizes[sind++];
788  }
789  }
790  }
791  delete [] recvbuffer;
792  }
793  return(_rc);
794  };
795 
796  int CommunicatorObject::_AllGatherMOV(std::vector<MobileObject *> &sVec,std::vector<MobileObject *> &rVec,
797  std::vector<int> &nsend_all)
798  {
799  int sndIcnt = sVec.size();
800  int local_size = 0;
801  std::vector<int> local_sizes(sndIcnt,0);
802  for(int i = 0;i < sndIcnt;i++){
803  local_sizes[i] = sVec[i]->Pack();
804  local_size += local_sizes[i];
805  }
806  std::vector<int> Allsizes;
807  this->AllGatherv(local_sizes,Allsizes,nsend_all);
808  int total_size = 0;
809  std::vector<int> allsizes(_nproc,0);
810  std::vector<int> disps(_nproc,0);
811  int sindex = 0;
812  for(int i = 0; i < _nproc; i++){
813  for(int j = 0; j < nsend_all[i]; j++){
814  allsizes[i] += Allsizes[sindex];
815  }
816  total_size += allsizes[i];
817  if(i > 0)
818  disps[i] = disps[i-1]+allsizes[i-1];
819  }
820  char *sendbuffer = new char [local_size];
821  char *recvbuffer = new char [total_size];
822  char *cur_pos = sendbuffer;
823  for(int i = 0;i < sndIcnt;i++){
824  std::memcpy(cur_pos,sVec[i]->GetBuffer(),sVec[i]->BufSize());
825  cur_pos += sVec[i]->BufSize();
826  }
827  std::vector<MobileObject *>::iterator svi = sVec.begin();
828  while(svi != sVec.end()){
829  (*svi)->DestroyBuffer();
830  svi++;
831  }
832  _rc = MPI_Allgatherv((void *)sendbuffer,local_size,MPI_CHAR,
833  (void *)recvbuffer,&allsizes[0],&disps[0],MPI_CHAR,_comm);
834  cur_pos = recvbuffer;
835  // unpack each object
836  int rind = 0;
837  int sind = 0;
838  for(int i = 0;i < _nproc;i++){
839  for(int j = 0;j < nsend_all[i];j++){
840  if(i == _rank){
841  rVec[rind++]->UnPack(sendbuffer);
842  sendbuffer += local_sizes[j];
843  cur_pos += local_sizes[j];
844  sind++;
845  }
846  else {
847  rVec[rind++]->UnPack(cur_pos);
848  cur_pos += Allsizes[sind++];
849  }
850  }
851  }
852  delete [] recvbuffer;
853  return(_rc);
854  };
855 
856 
858  {
859  switch(dt){
860  case comm::DTDOUBLE:
861  return(MPI_DOUBLE);
862  case comm::DTFLOAT:
863  return(MPI_FLOAT);
864  case comm::DTINT:
865  return(MPI_INTEGER);
866  case comm::DTUBYTE:
867  case comm::DTUCHAR:
868  return(MPI_UNSIGNED_CHAR);
869  case comm::DTCHAR:
870  case comm::DTBYTE:
871  return(MPI_CHAR);
872  case comm::DTSIZET:
873  return(MPI_LONG_LONG_INT);
874  case comm::DTUINT:
875  return(MPI_UNSIGNED);
876  default:
877  // return(static_cast<MPI_Datatype>(MPI_DATATYPE_NULL));
878  return(MPI_DATATYPE_NULL);
879  }
880  // should never get here
881  // return(static_cast<MPI_Datatype>(MPI_DATATYPE_NULL));
882  return(MPI_DATATYPE_NULL);
883  };
884 
886  {
887  switch(op){
888  case comm::MINOP:
889  return(MPI_MIN);
890  case comm::MAXOP:
891  return(MPI_MAX);
892  case comm::SUMOP:
893  return(MPI_SUM);
894  case comm::PRODOP:
895  return(MPI_PROD);
896  case comm::MAXLOCOP:
897  return(MPI_MAXLOC);
898  case comm::MINLOCOP:
899  return(MPI_MINLOC);
900  default:
901  return(MPI_OP_NULL);
902  }
903  return(MPI_OP_NULL);
904  };
905  };
906 };
907 
const void * GetBuffer() const
Definition: COMM.H:36
int WaitRecv(int recvid)
Definition: COMM.C:257
int StartSend(unsigned int rid)
Definition: COMM.C:395
MPI_Op ResolveOp(const comm::Ops &op)
Definition: COMM.C:885
int CartNeighbors(std::vector< int > &neighborRanks)
Definition: COMM.C:173
provides communication for complex objects.
Definition: COMM.H:27
std::string Hostname()
Definition: COMM.C:196
Communication utilities.
std::vector< int > _cart_periodic
Definition: COMM.H:82
DataTypes
Supported data types.
Definition: COMM.H:19
std::vector< int > _cart_dims
Definition: COMM.H:81
std::vector< MPI_Request > _recv_requests
Definition: COMM.H:76
int _SetRecv(void *buf, int recvsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:484
int _AllGatherv(void *sendbuf, int mysendcnt, int datasize, void *recvbuf)
Definition: COMM.C:542
Defines MPI-specific parallel global and program classes.
int _SetSend(void *buf, int sendsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:527
int _BroadCastMOV(std::vector< MobileObject *> &mos, int root_rank=0)
Definition: COMM.C:630
int _GatherMOV(std::vector< MobileObject *> &sVec, std::vector< MobileObject *> &rVec, std::vector< int > &nsend_all, int root=0)
Definition: COMM.C:735
int InitializeCartesianTopology(int numNodes, int numDims, std::vector< int > &dimDir, const std::vector< int > &isPeriodic, bool reOrder, CommunicatorObject &cartComm)
Definition: COMM.C:207
std::vector< int > _cart_coords
Definition: COMM.H:80
int _AllGatherMO(MobileObject *sPtr, std::vector< MobileObject *> &rVec, int sndcnt=1)
Definition: COMM.C:704
MPI_Datatype ResolveDataType(const comm::DataTypes &dt)
Definition: COMM.C:857
int StartRecv(unsigned int rid)
Definition: COMM.C:417
int ComputeCartesianDims(int numNodes, int numDims)
Definition: COMM.C:166
MPI_Datatype IntegerTypeID
Definition: COMM.H:89
MPI_Comm GetCommunicator() const
Definition: COMM.H:85
int _GatherMO(MobileObject *sPtr, std::vector< MobileObject *> &rVec, int sndcnt, int root=0)
Definition: COMM.C:581
int _Recv(void *buf, int recvsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:515
std::vector< int > _send_tags
Definition: COMM.H:77
int _ASend(void *buf, int sendsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:460
int Check(comm::Ops op=comm::MAXOP)
Definition: COMM.C:355
int String2Buf(const std::string &instr, void **buf)
virtual int Pack(void **inbuf=NULL)
Definition: COMM.C:22
virtual void DestroyBuffer()
Definition: COMM.H:42
Main encapsulation of MPI.
Definition: COMM.H:62
int Initialize(CommunicatorObject &incomm)
Definition: COMM.C:310
int AllGather(std::vector< DataType > &sendvec, std::vector< DataType > &recvvec, int sndcnt=0, int recvcnt=0)
Definition: COMM.H:326
int Gather(DataType &sendval, std::vector< DataType > &recvvec, int root=0)
Definition: COMM.H:395
int AllGatherv(std::vector< DataType > &sendvec, std::vector< DataType > &recvvec, std::vector< int > &nsend_all)
Definition: COMM.H:366
int _AllGatherMOV(std::vector< MobileObject *> &sVec, std::vector< MobileObject *> &rVec, std::vector< int > &nsend_all)
Definition: COMM.C:796
int RenewCommunicator(MPI_Comm &inComm)
Definition: COMM.C:131
int _Send(void *buf, int sendsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:475
std::vector< MPI_Status > _status
Definition: COMM.H:79
virtual int PrepareBuffer(size_t bsize)
Definition: COMM.C:10
int _ARecv(void *buf, int recvsize, unsigned int remote_rank, int tag=0)
Definition: COMM.C:500
virtual int UnPack(const void *outbuf=NULL)
Definition: COMM.C:52
Ops
Operations for collectives.
Definition: COMM.H:21
int Split(int color, int key, CommunicatorObject &newcomm)
Definition: COMM.C:151
std::vector< MPI_Request > _send_requests
Definition: COMM.H:75
int Gatherv(std::vector< DataType > &sendvec, std::vector< DataType > &recvvec, std::vector< int > &nsend_all, int nsend=0, int root=0)
Definition: COMM.H:460
int BroadCast(std::string &sval, int root_rank=0)
Definition: COMM.C:437
std::vector< int > _recv_tags
Definition: COMM.H:78