52template <
class IT,
class NT,
class DER>
55 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
58 perror(
"Input file doesn't exist\n");
61 commGrid.reset(
new CommGrid(world, 0, 0));
65template <
class IT,
class NT,
class DER>
68 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
69 commGrid.reset(
new CommGrid(world, 0, 0));
72template <
class IT,
class NT,
class DER>
75 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
79template <
class IT,
class NT,
class DER>
82 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
88template <
class IT,
class NT,
class DER>
91 SpParHelper::Print(
"COMBBLAS Warning: It is dangerous to create (matrix) objects without specifying the communicator, are you sure you want to create this object in MPI_COMM_WORLD?\n");
92 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
94 commGrid.reset(
new CommGrid(MPI_COMM_WORLD, 0, 0));
100template <
class IT,
class NT,
class DER>
104 assert( (
sizeof(
IT) >=
sizeof(
typename DER::LocalIT)) );
109template <
class IT,
class NT,
class DER>
112 if(spSeq != NULL)
delete spSeq;
115template <
class IT,
class NT,
class DER>
118 if(spSeq != NULL)
delete spSeq;
128template <
class IT,
class NT,
class DER>
129template <
typename VT,
typename GIT>
131 const std::vector<NT> & activemedians,
const std::vector<IT> & activennzperc,
int itersuntil,
132 std::vector< std::vector<NT> > & localmat,
const std::vector<IT> & actcolsmap, std::vector<IT> & klimits,
133 std::vector<IT> & toretain, std::vector<std::vector<std::pair<IT,NT>>> & tmppair,
IT coffset,
const FullyDistVec<GIT,VT> & rvec)
const
135 int rankincol = commGrid->GetRankInProcCol();
136 int colneighs = commGrid->GetGridRows();
138 std::vector<double> finalWeightedMedians(thischunk, 0.0);
140 MPI_Gather(activemedians.data() + itersuntil*chunksize, thischunk, MPIType<NT>(), all_medians.data(), thischunk, MPIType<NT>(), 0, commGrid->GetColWorld());
141 MPI_Gather(activennzperc.data() + itersuntil*chunksize, thischunk, MPIType<IT>(), nnz_per_col.data(), thischunk, MPIType<IT>(), 0, commGrid->GetColWorld());
145 std::vector<double> columnCounts(thischunk, 0.0);
146 std::vector< std::pair<NT, double> > mediansNweights(colneighs);
148 for(
int j = 0; j < thischunk; ++j)
150 for(
int k = 0; k<colneighs; ++k)
152 size_t fetchindex = k*thischunk+j;
153 columnCounts[j] +=
static_cast<double>(nnz_per_col[fetchindex]);
155 for(
int k = 0; k<colneighs; ++k)
157 size_t fetchindex = k*thischunk+j;
158 mediansNweights[k] = std::make_pair(all_medians[fetchindex],
static_cast<double>(nnz_per_col[fetchindex]) / columnCounts[j]);
160 sort(mediansNweights.begin(), mediansNweights.end());
162 double sumofweights = 0;
164 while( k<colneighs && sumofweights < 0.5)
166 sumofweights += mediansNweights[k++].second;
168 finalWeightedMedians[j] = mediansNweights[k-1].first;
171 MPI_Bcast(finalWeightedMedians.data(), thischunk,
MPIType<double>(), 0, commGrid->GetColWorld());
173 std::vector<IT> larger(thischunk, 0);
174 std::vector<IT> smaller(thischunk, 0);
175 std::vector<IT> equal(thischunk, 0);
178#pragma omp parallel for
180 for(
int j = 0; j < thischunk; ++j)
182 size_t fetchindex = actcolsmap[j+itersuntil*chunksize];
183 for(
size_t k = 0; k < localmat[fetchindex].size(); ++k)
186 if(localmat[fetchindex][k] > finalWeightedMedians[j])
188 else if(localmat[fetchindex][k] < finalWeightedMedians[j])
194 MPI_Allreduce(MPI_IN_PLACE, larger.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
195 MPI_Allreduce(MPI_IN_PLACE, smaller.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
196 MPI_Allreduce(MPI_IN_PLACE, equal.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
202 omp_init_lock(&(lock[i]));
205 numThreads = omp_get_num_threads();
209 std::vector < std::vector<IT> > perthread2retain(numThreads);
212#pragma omp parallel for
214 for(
int j = 0; j < thischunk; ++j)
217 int myThread = omp_get_thread_num();
223 size_t clmapindex = j+itersuntil*chunksize;
224 size_t fetchindex = actcolsmap[clmapindex];
227 if(klimits[clmapindex] <= larger[j])
229 std::vector<NT> survivors;
230 for(
size_t k = 0; k < localmat[fetchindex].size(); ++k)
232 if(localmat[fetchindex][k] > finalWeightedMedians[j])
233 survivors.push_back(localmat[fetchindex][k]);
235 localmat[fetchindex].swap(survivors);
236 perthread2retain[myThread].push_back(clmapindex);
238 else if (klimits[clmapindex] > larger[j] + equal[j])
240 std::vector<NT> survivors;
241 for(
size_t k = 0; k < localmat[fetchindex].size(); ++k)
243 if(localmat[fetchindex][k] < finalWeightedMedians[j])
244 survivors.push_back(localmat[fetchindex][k]);
246 localmat[fetchindex].swap(survivors);
248 klimits[clmapindex] -= (larger[j] + equal[j]);
249 perthread2retain[myThread].push_back(clmapindex);
253 std::vector<NT> survivors;
254 for(
size_t k = 0; k < localmat[fetchindex].size(); ++k)
256 if(localmat[fetchindex][k] >= finalWeightedMedians[j])
257 survivors.push_back(localmat[fetchindex][k]);
259 localmat[fetchindex].swap(survivors);
263 IT n_perproc = getlocalcols() / colneighs;
264 int assigned = std::max(
static_cast<int>(fetchindex/n_perproc), colneighs-1);
265 if( assigned == rankincol)
268 int owner = rvec.Owner(coffset + fetchindex, locid);
271 omp_set_lock(&(lock[owner]));
273 tmppair[owner].emplace_back(std::make_pair(locid, finalWeightedMedians[j]));
275 omp_unset_lock(&(lock[owner]));
281 std::vector<IT> tdisp(numThreads+1);
283 for(
int i=0; i<numThreads; ++i)
285 tdisp[i+1] = tdisp[i] + perthread2retain[i].size();
287 toretain.resize(tdisp[numThreads]);
289#pragma omp parallel for
290 for(
int i=0; i< numThreads; i++)
292 std::copy(perthread2retain[i].data() , perthread2retain[i].data()+ perthread2retain[i].
size(), toretain.data() + tdisp[i]);
296 for (
int i=0; i<
nprocs; i++)
297 omp_destroy_lock(&(lock[i]));
307template <
class IT,
class NT,
class DER>
308template <
typename VT,
typename GIT>
311 if(*rvec.commGrid != *commGrid)
313 SpParHelper::Print(
"Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
318 int rankincol = commGrid->GetRankInProcCol();
319 int rankinrow = commGrid->GetRankInProcRow();
320 int rowneighs = commGrid->GetGridCols();
321 int colneighs = commGrid->GetGridRows();
322 int myrank = commGrid->GetRank();
323 int nprocs = commGrid->GetSize();
326 Reduce(colcnt,
Column, std::plus<IT>(), (
IT) 0, [](
NT i){
return (
IT) 1;});
329 int xsize = (int) colcnt.LocArrSize();
331 int diagneigh = colcnt.commGrid->GetComplementRank();
333 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh,
TRX, &trxsize, 1, MPI_INT, diagneigh,
TRX, commGrid->GetWorld(), &status);
335 IT * trxnums =
new IT[trxsize];
336 MPI_Sendrecv(
const_cast<IT*
>(
SpHelper::p2a(colcnt.arr)), xsize, MPIType<IT>(), diagneigh,
TRX, trxnums, trxsize, MPIType<IT>(), diagneigh,
TRX, commGrid->GetWorld(), &status);
338 int * colsize =
new int[colneighs];
339 colsize[rankincol] = trxsize;
340 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, commGrid->GetColWorld());
341 int * dpls =
new int[colneighs]();
342 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
343 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
344 std::vector<IT> percsum(accsize);
346 MPI_Allgatherv(trxnums, trxsize, MPIType<IT>(), percsum.data(), colsize, dpls, MPIType<VT>(), commGrid->GetColWorld());
350 IT locm = getlocalcols();
351 std::vector< std::vector<NT> > localmat(locm);
354 if(accsize != locm) std::cout <<
"Gather vector along columns logic is wrong" << std::endl;
357 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
359 if(percsum[colit.colid()] >= k_limit)
361 localmat[colit.colid()].reserve(colit.nnz());
362 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
364 localmat[colit.colid()].push_back(nzit.value());
369 int64_t activecols = std::count_if(percsum.begin(), percsum.end(), [k_limit](
IT i){ return i >= k_limit;});
370 int64_t activennz = std::accumulate(percsum.begin(), percsum.end(), (
int64_t) 0);
372 int64_t totactcols, totactnnzs;
373 MPI_Allreduce(&activecols, &totactcols, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
374 if(myrank == 0) std::cout <<
"Number of initial active columns are " << totactcols << std::endl;
376 MPI_Allreduce(&activennz, &totactnnzs, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
377 if(myrank == 0) std::cout <<
"Number of initial nonzeros are " << totactnnzs << std::endl;
380 IT glactcols = colcnt.Count([k_limit](
IT i){
return i >= k_limit;});
381 if(myrank == 0) std::cout <<
"Number of initial active columns are " << glactcols << std::endl;
382 if(glactcols != totactcols)
if(myrank == 0) std::cout <<
"Wrong number of active columns are computed" << std::endl;
386 rvec = FullyDistVec<GIT,VT> ( rvec.
getcommgrid(), getncol(), std::numeric_limits<NT>::min());
395 std::ostringstream ss;
396 ss <<
"TopK: k_limit (" << k_limit <<
")" <<
" >= maxNnzInColumn. Returning the result of Reduce(Column, minimum<NT>()) instead..." << std::endl;
402 std::vector<IT> actcolsmap(activecols);
403 for (
IT i=0, j=0; i< locm; ++i) {
404 if(percsum[i] >= k_limit)
408 std::vector<NT> all_medians;
409 std::vector<IT> nnz_per_col;
410 std::vector<IT> klimits(activecols, k_limit);
411 int activecols_lowerbound = 10*colneighs;
414 IT * locncols =
new IT[rowneighs];
415 locncols[rankinrow] = locm;
416 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
417 IT coffset = std::accumulate(locncols, locncols+rankinrow,
static_cast<IT>(0));
421 MPI_Datatype MPI_pair;
422 MPI_Type_contiguous(
sizeof(std::pair<IT,NT>), MPI_CHAR, &MPI_pair);
423 MPI_Type_commit(&MPI_pair);
425 int * sendcnt =
new int[
nprocs];
426 int * recvcnt =
new int[
nprocs];
427 int * sdispls =
new int[
nprocs]();
428 int * rdispls =
new int[
nprocs]();
430 while(totactcols > 0)
432 int chunksize, iterations, lastchunk;
433 if(activecols > activecols_lowerbound)
438 chunksize =
static_cast<int>(activecols/colneighs);
439 iterations = std::max(
static_cast<int>(activecols/chunksize), 1);
440 lastchunk = activecols - (iterations-1)*chunksize;
444 chunksize = activecols;
446 lastchunk = activecols;
448 std::vector<NT> activemedians(activecols);
449 std::vector<IT> activennzperc(activecols);
452#pragma omp parallel for
454 for(
IT i=0; i< activecols; ++i)
456 size_t orgindex = actcolsmap[i];
457 if(localmat[orgindex].empty())
459 activemedians[i] = (
NT) 0;
460 activennzperc[i] = 0;
465 auto entriesincol(localmat[orgindex]);
466 std::nth_element(entriesincol.begin(), entriesincol.begin() + entriesincol.size()/2, entriesincol.end());
467 activemedians[i] = entriesincol[entriesincol.size()/2];
468 activennzperc[i] = entriesincol.size();
472 percsum.resize(activecols, 0);
473 MPI_Allreduce(activennzperc.data(), percsum.data(), activecols, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
474 activennz = std::accumulate(percsum.begin(), percsum.end(), (
int64_t) 0);
477 MPI_Allreduce(&activennz, &totactnnzs, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
478 if(myrank == 0) std::cout <<
"Number of active nonzeros are " << totactnnzs << std::endl;
481 std::vector<IT> toretain;
484 all_medians.resize(lastchunk*colneighs);
485 nnz_per_col.resize(lastchunk*colneighs);
487 std::vector< std::vector< std::pair<IT,NT> > > tmppair(
nprocs);
488 for(
int i=0; i< iterations-1; ++i)
490 TopKGather(all_medians, nnz_per_col, chunksize, chunksize, activemedians, activennzperc, i, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
492 TopKGather(all_medians, nnz_per_col, lastchunk, chunksize, activemedians, activennzperc, iterations-1, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
498 sendcnt[i] = tmppair[i].size();
499 totsend += tmppair[i].size();
502 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
504 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdispls+1);
505 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdispls+1);
506 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
507 assert((totsend < std::numeric_limits<int>::max()));
508 assert((totrecv < std::numeric_limits<int>::max()));
510 std::pair<IT,NT> * sendpair =
new std::pair<IT,NT>[totsend];
511 for(
int i=0; i<
nprocs; ++i)
513 std::copy(tmppair[i].begin(), tmppair[i].end(), sendpair+sdispls[i]);
514 std::vector< std::pair<IT,NT> >().swap(tmppair[i]);
516 std::vector< std::pair<IT,NT> > recvpair(totrecv);
517 MPI_Alltoallv(sendpair, sendcnt, sdispls, MPI_pair, recvpair.data(), recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
521 for(
auto & update : recvpair )
524 rvec.arr[update.first] = update.second;
527 MPI_Allreduce(MPI_IN_PLACE, &updated, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
528 if(myrank == 0) std::cout <<
"Total vector entries updated " << updated << std::endl;
533 std::vector<IT> newactivecols(toretain.size());
534 std::vector<IT> newklimits(toretain.size());
536 for(
auto & retind : toretain )
538 newactivecols[newindex] = actcolsmap[retind];
539 newklimits[newindex++] = klimits[retind];
541 actcolsmap.swap(newactivecols);
542 klimits.swap(newklimits);
543 activecols = actcolsmap.size();
545 MPI_Allreduce(&activecols, &totactcols, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
547 if(myrank == 0) std::cout <<
"Number of active columns are " << totactcols << std::endl;
550 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
551 MPI_Type_free(&MPI_pair);
554 if(myrank == 0) std::cout <<
"Exiting kselect2"<< std::endl;
561template <
class IT,
class NT,
class DER>
564 MPI_Comm World = commGrid->GetWorld();
565 int rank = commGrid->GetRank();
566 int nprocs = commGrid->GetSize();
569 char * _fn =
const_cast<char*
>(filename.c_str());
570 MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
572 int rankinrow = commGrid->GetRankInProcRow();
573 int rankincol = commGrid->GetRankInProcCol();
574 int rowneighs = commGrid->GetGridCols();
575 int colneighs = commGrid->GetGridRows();
577 IT * colcnts =
new IT[rowneighs];
578 IT * rowcnts =
new IT[colneighs];
579 rowcnts[rankincol] = getlocalrows();
580 colcnts[rankinrow] = getlocalcols();
582 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
583 IT coloffset = std::accumulate(colcnts, colcnts+rankinrow,
static_cast<IT>(0));
585 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
586 IT rowoffset = std::accumulate(rowcnts, rowcnts+rankincol,
static_cast<IT>(0));
590 prelens[
rank] = 2*getlocalnnz();
591 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
592 IT lengthuntil = std::accumulate(prelens, prelens+
rank,
static_cast<IT>(0));
596 MPI_Offset disp = lengthuntil *
sizeof(
uint32_t);
597 char native[] =
"native";
598 MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, native, MPI_INFO_NULL);
602 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
604 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
606 gen_edges[k++] = (
uint32_t) (nzit.rowid() + rowoffset);
607 gen_edges[k++] = (
uint32_t) (colit.colid() + coloffset);
610 assert(k == prelens[
rank]);
611 MPI_File_write(thefile, gen_edges, prelens[
rank], MPI_UNSIGNED, NULL);
612 MPI_File_close(&thefile);
619template <
class IT,
class NT,
class DER>
622 int myrank = commGrid->GetRank();
623 int nprocs = commGrid->GetSize();
624 IT totalm = getnrow();
625 IT totaln = getncol();
626 IT totnnz = getnnz();
631 int64_t localentries = getlocalnnz();
632 int64_t localbytes = localentries*elementsize ;
634 localbytes += headersize;
637 MPI_Exscan( &localbytes, &bytesuntil, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
638 if(myrank == 0) bytesuntil = 0;
640 MPI_Allreduce(&localbytes, &bytestotal, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
642 size_t writtensofar = 0;
643 char * localdata =
new char[localbytes];
646 char start[5] =
"HKDT";
655 std::memmove(localdata, start, 4);
656 std::memmove(localdata+4, hdr,
sizeof(hdr));
657 writtensofar = headersize;
662 GetPlaceInGlobalGrid(roffset, coffset);
666 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
668 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
670 IT glrowid = nzit.rowid() + roffset;
671 IT glcolid = colit.colid() + coffset;
672 NT glvalue = nzit.value();
673 std::memmove(localdata+writtensofar, &glrowid,
sizeof(
IT));
674 std::memmove(localdata+writtensofar+
sizeof(
IT), &glcolid,
sizeof(
IT));
675 std::memmove(localdata+writtensofar+2*
sizeof(
IT), &glvalue,
sizeof(
NT));
676 writtensofar += (2*
sizeof(
IT) +
sizeof(
NT));
681 cout <<
"local move happened..., writing to file\n";
686 MPI_File_open(commGrid->GetWorld(), (
char*) filename.c_str(), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile) ;
687 MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (
char*)
"native", MPI_INFO_NULL);
689 int64_t batchSize = 256 * 1024 * 1024;
690 size_t localfileptr = 0;
691 int64_t remaining = localbytes;
692 int64_t totalremaining = bytestotal;
694 while(totalremaining > 0)
698 std::cout <<
"Remaining " << totalremaining <<
" bytes to write in aggregate" << std::endl;
701 int curBatch = std::min(batchSize, remaining);
702 MPI_File_write_all(thefile, localdata+localfileptr, curBatch, MPI_CHAR, &status);
704 MPI_Get_count(&status, MPI_CHAR, &count);
705 assert( (curBatch == 0) || (count == curBatch) );
706 localfileptr += curBatch;
707 remaining -= curBatch;
708 MPI_Allreduce(&remaining, &totalremaining, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
710 MPI_File_close(&thefile);
715template <
class IT,
class NT,
class DER>
718 if(rhs.spSeq != NULL)
719 spSeq =
new DER(*(rhs.spSeq));
721 commGrid = rhs.commGrid;
724template <
class IT,
class NT,
class DER>
731 if(spSeq != NULL)
delete spSeq;
732 if(rhs.spSeq != NULL)
733 spSeq =
new DER(*(rhs.spSeq));
735 commGrid = rhs.commGrid;
740template <
class IT,
class NT,
class DER>
745 if(*commGrid == *rhs.commGrid)
747 (*spSeq) += (*(rhs.spSeq));
751 std::cout <<
"Grids are not comparable for parallel addition (A+B)" << std::endl;
756 std::cout<<
"Missing feature (A+A): Use multiply with 2 instead !"<<std::endl;
761template <
class IT,
class NT,
class DER>
764 IT totnnz = getnnz();
766 IT localnnz = (
IT) spSeq->getnnz();
767 MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
768 if(totnnz == 0)
return 1;
769 return static_cast<float>((commGrid->GetSize() * maxnnz)) /
static_cast<float>(totnnz);
772template <
class IT,
class NT,
class DER>
776 IT localnnz = spSeq->getnnz();
777 MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
781template <
class IT,
class NT,
class DER>
785 IT localrows = spSeq->getnrow();
786 MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
790template <
class IT,
class NT,
class DER>
794 IT localcols = spSeq->getncol();
795 MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
799template <
class IT,
class NT,
class DER>
800template <
typename _BinaryOperation>
804 if(!(*commGrid == *(x.commGrid)))
806 std::cout <<
"Grids are not comparable for SpParMat::DimApply" << std::endl;
810 MPI_Comm World = x.commGrid->GetWorld();
811 MPI_Comm ColWorld = x.commGrid->GetColWorld();
812 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
819 int diagneigh = x.commGrid->GetComplementRank();
821 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh,
TRX, &trxsize, 1, MPI_INT, diagneigh,
TRX, World, &status);
823 NT * trxnums =
new NT[trxsize];
824 MPI_Sendrecv(
const_cast<NT*
>(
SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh,
TRX, trxnums, trxsize, MPIType<NT>(), diagneigh,
TRX, World, &status);
826 int colneighs, colrank;
827 MPI_Comm_size(ColWorld, &colneighs);
828 MPI_Comm_rank(ColWorld, &colrank);
829 int * colsize =
new int[colneighs];
830 colsize[colrank] = trxsize;
832 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
833 int * dpls =
new int[colneighs]();
834 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
835 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
836 NT * scaler =
new NT[accsize];
838 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
841 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
843 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
845 nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
854 int rowneighs, rowrank;
855 MPI_Comm_size(RowWorld, &rowneighs);
856 MPI_Comm_rank(RowWorld, &rowrank);
857 int * rowsize =
new int[rowneighs];
858 rowsize[rowrank] = xsize;
859 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
860 int * dpls =
new int[rowneighs]();
861 std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
862 int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
863 NT * scaler =
new NT[accsize];
865 MPI_Allgatherv(
const_cast<NT*
>(
SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
868 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
870 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
872 nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
880 std::cout <<
"Unknown scaling dimension, returning..." << std::endl;
886template <
class IT,
class NT,
class DER>
887template <
typename _BinaryOperation,
typename _UnaryOperation >
905 std::cout <<
"Unknown reduction dimension, returning empty vector" << std::endl;
910 Reduce(parvec, dim, __binary_op,
id, __unary_op);
914template <
class IT,
class NT,
class DER>
915template <
typename _BinaryOperation>
933 std::cout <<
"Unknown reduction dimension, returning empty vector" << std::endl;
943template <
class IT,
class NT,
class DER>
944template <
typename VT,
typename GIT,
typename _BinaryOperation>
951template <
class IT,
class NT,
class DER>
952template <
typename VT,
typename GIT,
typename _BinaryOperation,
typename _UnaryOperation>
959template <
class IT,
class NT,
class DER>
960template <
typename VT,
typename GIT,
typename _BinaryOperation,
typename _UnaryOperation>
963 if(*rvec.commGrid != *commGrid)
965 SpParHelper::Print(
"Grids are not comparable, SpParMat::Reduce() fails!", commGrid->GetWorld());
976 IT n_thiscol = getlocalcols();
977 int colneighs = commGrid->GetGridRows();
978 int colrank = commGrid->GetRankInProcCol();
980 GIT * loclens =
new GIT[colneighs];
981 GIT * lensums =
new GIT[colneighs+1]();
983 GIT n_perproc = n_thiscol / colneighs;
984 if(colrank == colneighs-1)
985 loclens[colrank] = n_thiscol - (n_perproc*colrank);
987 loclens[colrank] = n_perproc;
989 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
990 std::partial_sum(loclens, loclens+colneighs, lensums+1);
992 std::vector<VT> trarr;
993 typename DER::SpColIter colit = spSeq->begcol();
994 for(
int i=0; i< colneighs; ++i)
996 VT * sendbuf =
new VT[loclens[i]];
997 std::fill(sendbuf, sendbuf+loclens[i],
id);
999 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit)
1001 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1003 sendbuf[colit.colid()-lensums[i]] = __binary_op(
static_cast<VT
>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1007 VT * recvbuf = NULL;
1010 trarr.resize(loclens[i]);
1013 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetColWorld());
1019 GIT trlen = trarr.size();
1020 int diagneigh = commGrid->GetComplementRank();
1022 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh,
TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh,
TRNNZ, commGrid->GetWorld(), &status);
1024 rvec.arr.resize(reallen);
1025 MPI_Sendrecv(
SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh,
TRX,
SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh,
TRX, commGrid->GetWorld(), &status);
1026 rvec.glen = getncol();
1032 rvec.glen = getnrow();
1033 int rowneighs = commGrid->GetGridCols();
1034 int rowrank = commGrid->GetRankInProcRow();
1035 GIT * loclens =
new GIT[rowneighs];
1036 GIT * lensums =
new GIT[rowneighs+1]();
1037 loclens[rowrank] = rvec.MyLocLength();
1038 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
1039 std::partial_sum(loclens, loclens+rowneighs, lensums+1);
1042 rvec.arr.resize(loclens[rowrank],
id);
1046 #define MAXCOLUMNBATCH 5 * 1024 * 1024
1047 typename DER::SpColIter begfinger = spSeq->begcol();
1050 int numreducecalls = (int) ceil(
static_cast<float>(spSeq->getnzc()) /
static_cast<float>(
MAXCOLUMNBATCH));
1052 MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
1054 for(
int k=0; k< maxreducecalls; ++k)
1056 std::vector<typename DER::SpColIter::NzIter> nziters;
1057 typename DER::SpColIter curfinger = begfinger;
1058 for(; curfinger != spSeq->endcol() && nziters.size() <
MAXCOLUMNBATCH ; ++curfinger)
1060 nziters.push_back(spSeq->begnz(curfinger));
1062 for(
int i=0; i< rowneighs; ++i)
1064 VT * sendbuf =
new VT[loclens[i]];
1065 std::fill(sendbuf, sendbuf+loclens[i],
id);
1067 typename DER::SpColIter colit = begfinger;
1069 for(; colit != curfinger; ++colit, ++colcnt)
1071 typename DER::SpColIter::NzIter nzit = nziters[colcnt];
1072 for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit)
1074 sendbuf[nzit.rowid()-lensums[i]] = __binary_op(
static_cast<VT
>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
1076 nziters[colcnt] = nzit;
1079 VT * recvbuf = NULL;
1082 for(
int j=0; j< loclens[i]; ++j)
1084 sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]);
1088 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetRowWorld());
1091 begfinger = curfinger;
1095 catch (std::length_error& le)
1097 std::cerr <<
"Length error: " << le.what() << std::endl;
1103 std::cout <<
"Unknown reduction dimension, returning empty vector" << std::endl;
1110#define KSELECTLIMIT 10000
1118template <
class IT,
class NT,
class DER>
1119template <
typename VT,
typename GIT>
1122#ifdef COMBBLAS_DEBUG
1126 Kselect2(test2, k_limit);
1137 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1144 bool ret = Kselect2(kthAll, k_limit);
1146 [](VT spval, VT dval){
return dval;},
1147 [](VT spval, VT dval){
return true;},
1158template <
class IT,
class NT,
class DER>
1159template <
typename VT,
typename GIT>
1162#ifdef COMBBLAS_DEBUG
1166 Kselect2(test2, k_limit);
1177 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1180 return Kselect2(rvec, k_limit);
1189template <
class IT,
class NT,
class DER>
1190template <
typename VT,
typename GIT,
typename _UnaryOperation>
1193 if(*rvec.commGrid != *commGrid)
1195 SpParHelper::Print(
"Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1199 std::cerr <<
"Dense kslect is called!! " << std::endl;
1201 Reduce(nnzPerColumn,
Column, std::plus<IT>(), (
IT)0, [](
NT val){
return (
IT)1;});
1203 if(k>maxnnzPerColumn)
1205 SpParHelper::Print(
"Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1210 IT n_thiscol = getlocalcols();
1214 std::vector<VT> sendbuf(n_thiscol*k);
1215 std::vector<IT> send_coldisp(n_thiscol+1,0);
1216 std::vector<IT> local_coldisp(n_thiscol+1,0);
1223 if(spSeq->getnnz()>0)
1225 typename DER::SpColIter colit = spSeq->begcol();
1226 for(
IT i=0; i<n_thiscol; ++i)
1228 local_coldisp[i+1] = local_coldisp[i];
1229 send_coldisp[i+1] = send_coldisp[i];
1230 if((colit != spSeq->endcol()) && (i==colit.colid()))
1232 local_coldisp[i+1] += colit.nnz();
1234 send_coldisp[i+1] += k;
1236 send_coldisp[i+1] += colit.nnz();
1242 assert(local_coldisp[n_thiscol] == spSeq->getnnz());
1246 std::vector<VT> localmat(spSeq->getnnz());
1250#pragma omp parallel for
1252 for(
IT i=0; i<nzc; i++)
1255 typename DER::SpColIter colit = spSeq->begcol() + i;
1256 IT colid = colit.colid();
1257 IT idx = local_coldisp[colid];
1258 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1260 localmat[idx++] =
static_cast<VT
>(__unary_op(nzit.value()));
1265 std::sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1266 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], sendbuf.begin()+send_coldisp[colid]);
1270 std::partial_sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1271 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, sendbuf.begin()+send_coldisp[colid]);
1275 std::vector<VT>().swap(localmat);
1276 std::vector<IT>().swap(local_coldisp);
1278 std::vector<VT> recvbuf(n_thiscol*k);
1279 std::vector<VT> tempbuf(n_thiscol*k);
1280 std::vector<IT> recv_coldisp(n_thiscol+1);
1281 std::vector<IT> templen(n_thiscol);
1283 int colneighs = commGrid->GetGridRows();
1284 int colrank = commGrid->GetRankInProcCol();
1286 for(
int p=2; p <= colneighs; p*=2)
1289 if(colrank%p == p/2)
1291 int receiver = colrank - ceil(p/2);
1292 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1293 MPI_Send(sendbuf.data(), send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1296 else if(colrank%p == 0)
1298 int sender = colrank + ceil(p/2);
1299 if(sender < colneighs)
1302 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1303 MPI_Recv(recvbuf.data(), recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1308#pragma omp parallel for
1310 for(
IT i=0; i<n_thiscol; ++i)
1313 IT j=send_coldisp[i], l=recv_coldisp[i];
1317 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1319 if(sendbuf[j] > recvbuf[l])
1320 tempbuf[offset+lid++] = sendbuf[j++];
1322 tempbuf[offset+lid++] = recvbuf[l++];
1324 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1325 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1329 send_coldisp[0] = 0;
1330 for(
IT i=0; i<n_thiscol; i++)
1332 send_coldisp[i+1] = send_coldisp[i] + templen[i];
1337#pragma omp parallel for
1339 for(
IT i=0; i<n_thiscol; i++)
1342 std::copy(tempbuf.begin()+offset, tempbuf.begin()+offset+templen[i], sendbuf.begin() + send_coldisp[i]);
1347 MPI_Barrier(commGrid->GetWorld());
1348 std::vector<VT> kthItem(n_thiscol);
1350 int root = commGrid->GetDiagOfProcCol();
1351 if(root==0 && colrank==0)
1354#pragma omp parallel for
1356 for(
IT i=0; i<n_thiscol; i++)
1358 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1360 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1362 kthItem[i] = std::numeric_limits<VT>::min();
1365 else if(root>0 && colrank==0)
1368#pragma omp parallel for
1370 for(
IT i=0; i<n_thiscol; i++)
1372 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1374 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1376 kthItem[i] = std::numeric_limits<VT>::min();
1378 MPI_Send(kthItem.data(), n_thiscol, MPIType<VT>(), root, 0, commGrid->GetColWorld());
1380 else if(root>0 && colrank==root)
1382 MPI_Recv(kthItem.data(), n_thiscol, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1385 std::vector <int> sendcnts;
1386 std::vector <int> dpls;
1389 int proccols = commGrid->GetGridCols();
1390 IT n_perproc = n_thiscol / proccols;
1391 sendcnts.resize(proccols);
1392 std::fill(sendcnts.data(), sendcnts.data()+proccols-1, n_perproc);
1393 sendcnts[proccols-1] = n_thiscol - (n_perproc * (proccols-1));
1394 dpls.resize(proccols,0);
1395 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1398 int rowroot = commGrid->GetDiagOfProcRow();
1401 MPI_Scatter(sendcnts.data(),1, MPI_INT, & recvcnts, 1, MPI_INT, rowroot, commGrid->GetRowWorld());
1403 rvec.arr.resize(recvcnts);
1404 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.arr.data(), rvec.arr.size(), MPIType<VT>(),rowroot, commGrid->GetRowWorld());
1405 rvec.glen = getncol();
1411template <
class IT,
class NT,
class DER>
1412template <
typename VT,
typename GIT,
typename _UnaryOperation>
1416 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
1418 if(*rvec.commGrid != *commGrid)
1420 SpParHelper::Print(
"Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1438 IT n_thiscol = getlocalcols();
1439 MPI_Comm World = rvec.commGrid->GetWorld();
1440 MPI_Comm ColWorld = rvec.commGrid->GetColWorld();
1441 MPI_Comm RowWorld = rvec.commGrid->GetRowWorld();
1442 int colneighs = commGrid->GetGridRows();
1443 int colrank = commGrid->GetRankInProcCol();
1444 int coldiagrank = commGrid->GetDiagOfProcCol();
1460 int32_t *trxinds, *activeCols;
1461 VT *trxnums, *numacc=NULL;
1462 TransposeVector(World, rvec, trxlocnz, lenuntil, trxinds, trxnums,
true);
1464 if(rvec.commGrid->GetGridRows() > 1)
1467 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, activeCols, numacc, accnz,
true);
1472 activeCols = trxinds;
1477 std::vector<bool> isactive(n_thiscol,
false);
1478 for(
int i=0; i<accnz ; i++)
1480 isactive[activeCols[i]] =
true;
1482 IT nActiveCols = accnz;
1485 int64_t lannz = getlocalnnz();
1487 MPI_Allreduce(&lannz, &nnzColWorld, 1,
MPIType<int64_t>(), MPI_SUM, ColWorld);
1488 int64_t maxPerProcMemory = std::min(nnzColWorld, (
int64_t)nActiveCols*k) *
sizeof(VT);
1491 std::vector<IT> send_coldisp(n_thiscol+1,0);
1492 std::vector<IT> local_coldisp(n_thiscol+1,0);
1496 VT * sendbuf =
static_cast<VT *
> (::operator
new (maxPerProcMemory));
1502 if(spSeq->getnnz()>0)
1504 typename DER::SpColIter colit = spSeq->begcol();
1505 for(
IT i=0; i<n_thiscol; ++i)
1507 local_coldisp[i+1] = local_coldisp[i];
1508 send_coldisp[i+1] = send_coldisp[i];
1509 if( (colit != spSeq->endcol()) && (i==colit.colid()) )
1513 local_coldisp[i+1] += colit.nnz();
1515 send_coldisp[i+1] += k;
1517 send_coldisp[i+1] += colit.nnz();
1528 VT * localmat =
static_cast<VT *
> (::operator
new (local_coldisp[n_thiscol]*
sizeof(VT)));
1532#pragma omp parallel for
1534 for(
IT i=0; i<nzc; i++)
1537 typename DER::SpColIter colit = spSeq->begcol() + i;
1538 IT colid = colit.colid();
1541 IT idx = local_coldisp[colid];
1542 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1544 localmat[idx++] =
static_cast<VT
>(__unary_op(nzit.value()));
1549 sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], std::greater<VT>());
1550 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], sendbuf+send_coldisp[colid]);
1554 partial_sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, localmat+local_coldisp[colid+1], std::greater<VT>());
1555 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, sendbuf+send_coldisp[colid]);
1561 ::operator
delete(localmat);
1562 std::vector<IT>().swap(local_coldisp);
1571 VT * recvbuf =
static_cast<VT *
> (::operator
new (maxPerProcMemory));
1572 VT * tempbuf =
static_cast<VT *
> (::operator
new (maxPerProcMemory));
1575 std::vector<IT> recv_coldisp(n_thiscol+1);
1576 std::vector<IT> temp_coldisp(n_thiscol+1);
1581 for(
int p=2; p <= colneighs; p*=2)
1584 if(colrank%p == p/2)
1586 int receiver = colrank - ceil(p/2);
1587 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1588 MPI_Send(sendbuf, send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1591 else if(colrank%p == 0)
1593 int sender = colrank + ceil(p/2);
1594 if(sender < colneighs)
1597 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1598 MPI_Recv(recvbuf, recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1600 temp_coldisp[0] = 0;
1601 for(
IT i=0; i<n_thiscol; ++i)
1603 IT sendlen = send_coldisp[i+1] - send_coldisp[i];
1604 IT recvlen = recv_coldisp[i+1] - recv_coldisp[i];
1605 IT templen = std::min((sendlen+recvlen), k);
1606 temp_coldisp[i+1] = temp_coldisp[i] + templen;
1611#pragma omp parallel for
1613 for(
IT i=0; i<n_thiscol; ++i)
1616 IT j=send_coldisp[i], l=recv_coldisp[i];
1619 IT offset = temp_coldisp[i];
1621 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1623 if(sendbuf[j] > recvbuf[l])
1624 tempbuf[offset+lid++] = sendbuf[j++];
1626 tempbuf[offset+lid++] = recvbuf[l++];
1628 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1629 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1633 std::copy(temp_coldisp.begin(), temp_coldisp.end(), send_coldisp.begin());
1634 std::copy(tempbuf, tempbuf+temp_coldisp[n_thiscol], sendbuf);
1659 MPI_Barrier(commGrid->GetWorld());
1672 std::vector<VT> kthItem(nActiveCols);
1676#pragma omp parallel for
1678 for(
IT i=0; i<nActiveCols; i++)
1680 IT ai = activeCols[i];
1681 IT nitems = send_coldisp[ai+1]-send_coldisp[ai];
1683 kthItem[i] = sendbuf[send_coldisp[ai]+k-1];
1685 kthItem[i] = std::numeric_limits<VT>::min();
1687 kthItem[i] = sendbuf[send_coldisp[ai+1]-1];
1699 if(coldiagrank>0 && colrank==0)
1701 MPI_Send(kthItem.data(), nActiveCols, MPIType<VT>(), coldiagrank, 0, commGrid->GetColWorld());
1703 else if(coldiagrank>0 && colrank==coldiagrank)
1705 MPI_Recv(kthItem.data(), nActiveCols, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1719 int rowroot = commGrid->GetDiagOfProcRow();
1720 int proccols = commGrid->GetGridCols();
1721 std::vector <int> sendcnts(proccols,0);
1722 std::vector <int> dpls(proccols,0);
1723 int lsize = rvec.ind.size();
1725 MPI_Gather(&lsize,1, MPI_INT, sendcnts.data(), 1, MPI_INT, rowroot, RowWorld);
1726 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1727 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.num.data(), rvec.num.size(), MPIType<VT>(),rowroot, RowWorld);
1729 delete [] activeCols;
1732 ::operator
delete(sendbuf);
1733 ::operator
delete(recvbuf);
1734 ::operator
delete(tempbuf);
1742template <
class IT,
class NT,
class DER>
1747 IT m_perproc = getnrow() / commGrid->GetGridRows();
1748 IT n_perproc = getncol() / commGrid->GetGridCols();
1749 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1750 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1752 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
1754 IT diagrow = colit.colid() + noffset;
1755 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1756 if(nzit != spSeq->endnz(colit))
1758 IT firstrow = nzit.rowid() + moffset;
1759 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1761 if(firstrow <= diagrow)
1763 IT dev = diagrow - firstrow;
1764 if(upperlBW < dev) upperlBW = dev;
1766 if(lastrow >= diagrow)
1768 IT dev = lastrow - diagrow;
1769 if(lowerlBW < dev) lowerlBW = dev;
1775 MPI_Allreduce( &upperlBW, &upperBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1785template <
class IT,
class NT,
class DER>
1788 int colrank = commGrid->GetRankInProcRow();
1789 IT cols = getncol();
1790 IT rows = getnrow();
1791 IT m_perproc = cols / commGrid->GetGridRows();
1792 IT n_perproc = rows / commGrid->GetGridCols();
1793 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1794 IT noffset = colrank * n_perproc;
1797 int pc = commGrid->GetGridCols();
1799 if(colrank!=pc-1 ) n_thisproc = n_perproc;
1800 else n_thisproc = cols - (pc-1)*n_perproc;
1803 std::vector<IT> firstRowInCol(n_thisproc,getnrow());
1804 std::vector<IT> lastRowInCol(n_thisproc,-1);
1806 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
1808 IT diagrow = colit.colid() + noffset;
1809 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1810 if(nzit != spSeq->endnz(colit))
1812 IT firstrow = nzit.rowid() + moffset;
1813 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1814 if(firstrow <= diagrow)
1816 firstRowInCol[colit.colid()] = firstrow;
1818 if(lastrow >= diagrow)
1820 lastRowInCol[colit.colid()] = lastrow;
1825 std::vector<IT> firstRowInCol_global(n_thisproc,getnrow());
1827 MPI_Allreduce( firstRowInCol.data(), firstRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MIN, commGrid->colWorld);
1831 for(
IT i=0; i<n_thisproc; i++)
1833 if(firstRowInCol_global[i]==rows)
1836 profile += (i + noffset - firstRowInCol_global[i]);
1839 IT profile_global = 0;
1840 MPI_Allreduce( &profile, &profile_global, 1, MPIType<IT>(), MPI_SUM, commGrid->rowWorld);
1842 return (profile_global);
1847template <
class IT,
class NT,
class DER>
1848template <
typename VT,
typename GIT,
typename _BinaryOperation>
1856 MaskedReduce(rvec, mask, dim, __binary_op,
id,
myidentity<NT>(), exclude);
1868template <
class IT,
class NT,
class DER>
1869template <
typename VT,
typename GIT,
typename _BinaryOperation,
typename _UnaryOperation>
1870void SpParMat<IT,NT,DER>::MaskedReduce(
FullyDistVec<GIT,VT> & rvec,
FullyDistSpVec<GIT,VT> & mask,
Dim dim, _BinaryOperation __binary_op, VT
id, _UnaryOperation __unary_op,
bool exclude)
const
1872 MPI_Comm World = commGrid->GetWorld();
1873 MPI_Comm ColWorld = commGrid->GetColWorld();
1874 MPI_Comm RowWorld = commGrid->GetRowWorld();
1881 if(*rvec.commGrid != *commGrid)
1883 SpParHelper::Print(
"Grids are not comparable, SpParMat::MaskedReduce() fails!", commGrid->GetWorld());
1887 int rowneighs = commGrid->GetGridCols();
1888 int rowrank = commGrid->GetRankInProcRow();
1889 std::vector<int> rownz(rowneighs);
1890 int locnnzMask =
static_cast<int> (mask.
getlocnnz());
1891 rownz[rowrank] = locnnzMask;
1892 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rownz.data(), 1, MPI_INT, RowWorld);
1893 std::vector<int> dpls(rowneighs+1,0);
1894 std::partial_sum(rownz.begin(), rownz.end(), dpls.data()+1);
1895 int accnz = std::accumulate(rownz.begin(), rownz.end(), 0);
1896 std::vector<GIT> sendInd(locnnzMask);
1897 std::transform(mask.ind.begin(), mask.ind.end(),sendInd.begin(), bind2nd(std::plus<GIT>(), mask.RowLenUntil()));
1899 std::vector<GIT> indMask(accnz);
1900 MPI_Allgatherv(sendInd.data(), rownz[rowrank], MPIType<GIT>(), indMask.data(), rownz.data(), dpls.data(), MPIType<GIT>(), RowWorld);
1904 IT n_thiscol = getlocalcols();
1905 int colneighs = commGrid->GetGridRows();
1906 int colrank = commGrid->GetRankInProcCol();
1908 GIT * loclens =
new GIT[colneighs];
1909 GIT * lensums =
new GIT[colneighs+1]();
1911 GIT n_perproc = n_thiscol / colneighs;
1912 if(colrank == colneighs-1)
1913 loclens[colrank] = n_thiscol - (n_perproc*colrank);
1915 loclens[colrank] = n_perproc;
1917 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
1918 std::partial_sum(loclens, loclens+colneighs, lensums+1);
1920 std::vector<VT> trarr;
1921 typename DER::SpColIter colit = spSeq->begcol();
1922 for(
int i=0; i< colneighs; ++i)
1924 VT * sendbuf =
new VT[loclens[i]];
1925 std::fill(sendbuf, sendbuf+loclens[i],
id);
1927 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit)
1930 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1932 for(; nzit != spSeq->endnz(colit) && k < indMask.size(); )
1934 if(nzit.rowid() < indMask[k])
1938 sendbuf[colit.colid()-lensums[i]] = __binary_op(
static_cast<VT
>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1942 else if(nzit.rowid() > indMask[k]) ++k;
1947 sendbuf[colit.colid()-lensums[i]] = __binary_op(
static_cast<VT
>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1956 while(nzit != spSeq->endnz(colit))
1958 sendbuf[colit.colid()-lensums[i]] = __binary_op(
static_cast<VT
>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1964 VT * recvbuf = NULL;
1967 trarr.resize(loclens[i]);
1976 GIT trlen = trarr.size();
1977 int diagneigh = commGrid->GetComplementRank();
1979 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh,
TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh,
TRNNZ, commGrid->GetWorld(), &status);
1981 rvec.arr.resize(reallen);
1982 MPI_Sendrecv(
SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh,
TRX,
SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh,
TRX, commGrid->GetWorld(), &status);
1983 rvec.glen = getncol();
1990template <
class IT,
class NT,
class DER>
1991template <
typename NNT,
typename NDER>
1994 NDER *
convert =
new NDER(*spSeq);
1999template <
class IT,
class NT,
class DER>
2000template <
typename NIT,
typename NNT,
typename NDER>
2003 NDER *
convert =
new NDER(*spSeq);
2011template <
class IT,
class NT,
class DER>
2015 DER * tempseq =
new DER((*spSeq)(ri, ci));
2026template <
class IT,
class NT,
class DER>
2027template <
typename PTNTBOOL,
typename PTBOOLNT>
2030 typedef typename DER::LocalIT LIT;
2035 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2045 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2047 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2049 IT totalm = getnrow();
2050 IT totaln = getncol();
2051 if(locmax_ri > totalm || locmax_ci > totaln)
2059 IT roffset = ri.RowLenUntil();
2060 IT rrowlen = ri.MyRowLength();
2061 IT coffset = ci.RowLenUntil();
2062 IT crowlen = ci.MyRowLength();
2070 IT rowneighs = commGrid->GetGridCols();
2071 IT m_perproccol = totalm / rowneighs;
2072 IT n_perproccol = totaln / rowneighs;
2075 IT diagneigh = commGrid->GetComplementRank();
2076 IT mylocalrows = getlocalrows();
2077 IT mylocalcols = getlocalcols();
2080 MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh,
TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh,
TRROWX, commGrid->GetWorld(), &status);
2083 std::vector< std::vector<IT> > rowid(rowneighs);
2084 std::vector< std::vector<IT> > colid(rowneighs);
2087 IT locvec = ri.arr.size();
2088 for(
typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2092 IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
2095 rowid[rowrec].push_back( i + roffset);
2096 colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
2099 int * sendcnt =
new int[rowneighs];
2100 int * recvcnt =
new int[rowneighs];
2101 for(
IT i=0; i<rowneighs; ++i)
2102 sendcnt[i] = rowid[i].
size();
2104 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld());
2105 int * sdispls =
new int[rowneighs]();
2106 int * rdispls =
new int[rowneighs]();
2107 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2108 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2109 IT p_nnz = std::accumulate(recvcnt,recvcnt+rowneighs,
static_cast<IT>(0));
2112 IT * p_rows =
new IT[p_nnz];
2113 IT * p_cols =
new IT[p_nnz];
2114 IT * senddata =
new IT[locvec];
2115 for(
int i=0; i<rowneighs; ++i)
2117 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2118 std::vector<IT>().swap(rowid[i]);
2120 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2122 for(
int i=0; i<rowneighs; ++i)
2124 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2125 std::vector<IT>().swap(colid[i]);
2127 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2130 std::tuple<LIT,LIT,bool> * p_tuples =
new std::tuple<LIT,LIT,bool>[p_nnz];
2131 for(
IT i=0; i< p_nnz; ++i)
2133 p_tuples[i] = std::make_tuple(p_rows[i], p_cols[i], 1);
2137 DER_IT * PSeq =
new DER_IT();
2138 PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples);
2143 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
2153 *
this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *
this,
false,
true);
2161 *
this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*
this, P,
true,
true);
2166 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*
this);
2168 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
2178 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *
this);
2185 locvec = ci.arr.size();
2186 for(
typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2189 IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
2192 rowid[rowrec].push_back( i + coffset);
2193 colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
2196 for(
IT i=0; i<rowneighs; ++i)
2197 sendcnt[i] = rowid[i].
size();
2199 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld());
2200 std::fill(sdispls, sdispls+rowneighs, 0);
2201 std::fill(rdispls, rdispls+rowneighs, 0);
2202 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2203 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2204 IT q_nnz = std::accumulate(recvcnt,recvcnt+rowneighs,
static_cast<IT>(0));
2207 IT * q_rows =
new IT[q_nnz];
2208 IT * q_cols =
new IT[q_nnz];
2209 senddata =
new IT[locvec];
2210 for(
int i=0; i<rowneighs; ++i)
2212 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2213 std::vector<IT>().swap(rowid[i]);
2215 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2217 for(
int i=0; i<rowneighs; ++i)
2219 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2220 std::vector<IT>().swap(colid[i]);
2222 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2223 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2225 std::tuple<LIT,LIT,bool> * q_tuples =
new std::tuple<LIT,LIT,bool>[q_nnz];
2226 for(
IT i=0; i< q_nnz; ++i)
2228 q_tuples[i] = std::make_tuple(q_rows[i], q_cols[i], 1);
2231 DER_IT * QSeq =
new DER_IT();
2232 QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples);
2240 *
this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q,
true,
true);
2245 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
2254template<
typename PTNTBOOL,
2263 typedef typename DER::LocalIT LIT;
2266 if (*(v.commGrid) != *commGrid)
2274 locmax = *std::max_element(v.arr.begin(), v.arr.end());
2276 IT offset = v.RowLenUntil();
2277 IT rowlen = v.MyRowLength();
2278 IT totalm = getnrow();
2279 IT totaln = getncol();
2280 IT rowneighs = commGrid->GetGridCols();
2288 if (locmax > totalm)
2291 perproccol = totalm / rowneighs;
2293 diagneigh = commGrid->GetComplementRank();
2295 tmp = getlocalrows();
2296 MPI_Sendrecv(&tmp, 1, MPIType<IT>(), diagneigh,
TRROWX,
2297 &dimy, 1, MPIType<IT>(), diagneigh,
TRROWX,
2298 commGrid->GetWorld(), &status);
2304 if (locmax > totaln)
2307 perproccol = totaln / rowneighs;
2308 dimy = getlocalcols();
2319 std::vector<std::vector<IT>> rowid(rowneighs);
2320 std::vector<std::vector<IT>> colid(rowneighs);
2321 IT locvec = v.arr.size();
2322 for(
typename std::vector<IT>::size_type i = 0; i < (unsigned)locvec; ++i)
2324 IT rowrec = (perproccol != 0)
2325 ? std::min(v.arr[i] / perproccol, rowneighs - 1)
2328 rowid[rowrec].push_back(i + offset);
2329 colid[rowrec].push_back(v.arr[i] - (rowrec * perproccol));
2334 int *sendcnt =
new int[rowneighs];
2335 int *recvcnt =
new int[rowneighs];
2336 for (
IT i = 0; i < rowneighs; ++i)
2337 sendcnt[i] = rowid[i].
size();
2339 MPI_Alltoall(sendcnt, 1, MPI_INT,
2340 recvcnt, 1, MPI_INT,
2341 commGrid->GetRowWorld());
2343 int *sdispls =
new int[rowneighs]();
2344 int *rdispls =
new int[rowneighs]();
2345 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2346 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2347 IT v_nnz = std::accumulate(recvcnt, recvcnt+rowneighs,
static_cast<IT>(0));
2349 IT *v_rows =
new IT[v_nnz];
2350 IT *v_cols =
new IT[v_nnz];
2351 IT *senddata =
new IT[locvec];
2353 for(
int i = 0; i < rowneighs; ++i)
2355 std::copy(rowid[i].begin(), rowid[i].end(), senddata + sdispls[i]);
2356 std::vector<IT>().swap(rowid[i]);
2359 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(),
2360 v_rows, recvcnt, rdispls, MPIType<IT>(),
2361 commGrid->GetRowWorld());
2363 for(
int i = 0; i < rowneighs; ++i)
2365 std::copy(colid[i].begin(), colid[i].end(), senddata + sdispls[i]);
2366 std::vector<IT>().swap(colid[i]);
2369 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(),
2370 v_cols, recvcnt, rdispls, MPIType<IT>(),
2371 commGrid->GetRowWorld());
2377 std::tuple<LIT, LIT, bool> *v_tuples =
2378 new std::tuple<LIT, LIT, bool>[v_nnz];
2379 for(
IT i = 0; i < v_nnz; ++i)
2380 v_tuples[i] = std::make_tuple(v_rows[i], v_cols[i], 1);
2384 DER_IT *vseq =
new DER_IT();
2385 vseq->Create(v_nnz, rowlen, dimy, v_tuples);
2395 *
this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(V, *
this,
true,
true);
2399 return Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(V, *
this);
2408 *
this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*
this, V,
true,
true);
2412 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*
this, V);
2426template <
class IT,
class NT,
class DER>
2431 if((*(ri.commGrid) != *(
B.commGrid)) || (*(ci.commGrid) != *(
B.commGrid)))
2433 SpParHelper::Print(
"Grids are not comparable, SpAsgn fails !", commGrid->GetWorld());
2436 IT total_m_A = getnrow();
2437 IT total_n_A = getncol();
2438 IT total_m_B =
B.getnrow();
2439 IT total_n_B =
B.getncol();
2441 if(total_m_B != ri.TotalLength())
2443 SpParHelper::Print(
"First dimension of B does NOT match the length of ri, SpAsgn fails !", commGrid->GetWorld());
2446 if(total_n_B != ci.TotalLength())
2448 SpParHelper::Print(
"Second dimension of B does NOT match the length of ci, SpAsgn fails !", commGrid->GetWorld());
2455 rvec->
iota(total_m_B, 0);
2462 qvec->
iota(total_n_B, 0);
2474template <
class IT,
class NT,
class DER>
2477 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2479 SpParHelper::Print(
"Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2487 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2489 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2491 IT total_m = getnrow();
2492 IT total_n = getncol();
2493 if(locmax_ri > total_m || locmax_ci > total_n)
2499 typedef typename DER::LocalIT LIT;
2505 SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff< BoolCopy2ndSRing<NT> ,
NT, DER>(S, *
this,
true,
false);
2508 SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff< BoolCopy1stSRing<NT> ,
NT, DER>(SA, T,
true,
true);
2521template <
class IT,
class NT,
class DER>
2524 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2526 SpParHelper::Print(
"Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2534 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2536 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2538 IT total_m = getnrow();
2539 IT total_n = getncol();
2540 if(locmax_ri > total_m || locmax_ci > total_n)
2546 typedef typename DER::LocalIT LIT;
2552 SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff< BoolCopy2ndSRing<NT> ,
NT, DER>(S, *
this,
true,
false);
2555 SpParMat<IT,NT,DER> AT = Mult_AnXBn_DoubleBuff< BoolCopy1stSRing<NT> ,
NT, DER>(*
this, T,
false,
true);
2565template <
class IT,
class NT,
class DER>
2566template <
typename _BinaryOperation>
2570 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
2572 MPI_Comm World = pvals.commGrid->GetWorld();
2574 if(getncol() != pvals.TotalLength())
2576 std::ostringstream outs;
2577 outs <<
"Can not prune column-by-column, dimensions does not match"<< std::endl;
2578 outs << getncol() <<
" != " << pvals.TotalLength() << std::endl;
2582 if(! ( *(getcommgrid()) == *(pvals.
getcommgrid())) )
2584 std::cout <<
"Grids are not comparable for PurneColumn" << std::endl;
2588 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2594 int diagneigh = pvals.commGrid->GetComplementRank();
2596 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh,
TRX, &trxsize, 1, MPI_INT, diagneigh,
TRX, World, &status);
2599 NT * trxnums =
new NT[trxsize];
2600 MPI_Sendrecv(
const_cast<NT*
>(
SpHelper::p2a(pvals.arr)), xsize, MPIType<NT>(), diagneigh,
TRX, trxnums, trxsize, MPIType<NT>(), diagneigh,
TRX, World, &status);
2602 int colneighs, colrank;
2603 MPI_Comm_size(ColWorld, &colneighs);
2604 MPI_Comm_rank(ColWorld, &colrank);
2605 int * colsize =
new int[colneighs];
2606 colsize[colrank] = trxsize;
2607 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
2608 int * dpls =
new int[colneighs]();
2609 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
2610 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
2611 std::vector<NT> numacc(accsize);
2613#ifdef COMBBLAS_DEBUG
2614 std::ostringstream outs2;
2615 outs2 <<
"PruneColumn displacements: ";
2616 for(
int i=0; i< colneighs; ++i)
2618 outs2 << dpls[i] <<
" ";
2626 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), numacc.data(), colsize, dpls, MPIType<NT>(), ColWorld);
2632 if(accsize != getlocalcols()){
2633 fprintf(stderr,
"[PruneColumn]\tmyrank:%d\taccsize:%d\tgetlocalcols():%d\n", myrank, accsize, getlocalcols());
2635 assert(accsize == getlocalcols());
2638 spSeq->PruneColumn(numacc.data(), __binary_op, inPlace);
2643 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(numacc.data(), __binary_op, inPlace), commGrid);
2647template <
class IT,
class NT,
class DER>
2648template <
class IRRELEVANT_NT>
2651 MPI_Comm World = ci.commGrid->GetWorld();
2654 if (getncol() != ci.TotalLength())
2656 std::ostringstream outs;
2657 outs <<
"Cannot prune column-by-column, dimensions do not match" << std::endl;
2658 outs << getncol() <<
" != " << ci.TotalLength() << std::endl;
2665 std::cout <<
"Grids are not comparable for PruneColumnByIndex" << std::endl;
2669 MPI_Comm ColWorld = ci.commGrid->GetColWorld();
2670 int diagneigh = ci.commGrid->GetComplementRank();
2673 IT xrofst = ci.RowLenUntil();
2677 MPI_Sendrecv(&xrofst, 1, MPIType<IT>(), diagneigh,
TROST, &trxrofst, 1, MPIType<IT>(), diagneigh,
TROST, World, MPI_STATUS_IGNORE);
2678 MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh,
TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh,
TRNNZ, World, MPI_STATUS_IGNORE);
2680 std::vector<IT> trxinds(trxlocnz);
2682 MPI_Sendrecv(ci.ind.data(), xlocnz, MPIType<IT>(), diagneigh,
TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh,
TRI, World, MPI_STATUS_IGNORE);
2684 std::transform(trxinds.data(), trxinds.data() + trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), trxrofst));
2686 int colneighs, colrank;
2687 MPI_Comm_size(ColWorld, &colneighs);
2688 MPI_Comm_rank(ColWorld, &colrank);
2690 std::vector<int> colnz(colneighs);
2691 colnz[colrank] = trxlocnz;
2692 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz.data(), 1, MPI_INT, ColWorld);
2693 std::vector<int> dpls(colneighs, 0);
2694 std::partial_sum(colnz.begin(), colnz.end()-1, dpls.begin()+1);
2695 IT accnz = std::accumulate(colnz.begin(), colnz.end(), 0);
2697 std::vector<IT> indacc(accnz);
2698 MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz.data(), dpls.data(), MPIType<IT>(), ColWorld);
2700 std::sort(indacc.begin(), indacc.end());
2702 spSeq->PruneColumnByIndex(indacc);
2708template <
class IT,
class NT,
class DER>
2709template <
typename _BinaryOperation>
2713 MPI_Comm World = pvals.commGrid->GetWorld();
2715 if(getncol() != pvals.TotalLength())
2717 std::ostringstream outs;
2718 outs <<
"Can not prune column-by-column, dimensions does not match"<< std::endl;
2719 outs << getncol() <<
" != " << pvals.TotalLength() << std::endl;
2723 if(! ( *(getcommgrid()) == *(pvals.
getcommgrid())) )
2725 std::cout <<
"Grids are not comparable for PurneColumn" << std::endl;
2729 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2730 int diagneigh = pvals.commGrid->GetComplementRank();
2733 IT roffst = pvals.RowLenUntil();
2738 MPI_Sendrecv(&roffst, 1, MPIType<IT>(), diagneigh,
TROST, &roffset, 1, MPIType<IT>(), diagneigh,
TROST, World, &status);
2739 MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh,
TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh,
TRNNZ, World, &status);
2741 std::vector<IT> trxinds (trxlocnz);
2742 std::vector<NT> trxnums (trxlocnz);
2743 MPI_Sendrecv(pvals.ind.data(), xlocnz, MPIType<IT>(), diagneigh,
TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh,
TRI, World, &status);
2744 MPI_Sendrecv(pvals.num.data(), xlocnz, MPIType<NT>(), diagneigh,
TRX, trxnums.data(), trxlocnz, MPIType<NT>(), diagneigh,
TRX, World, &status);
2745 std::transform(trxinds.data(), trxinds.data()+trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), roffset));
2747 int colneighs, colrank;
2748 MPI_Comm_size(ColWorld, &colneighs);
2749 MPI_Comm_rank(ColWorld, &colrank);
2750 int * colnz =
new int[colneighs];
2751 colnz[colrank] = trxlocnz;
2752 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
2753 int * dpls =
new int[colneighs]();
2754 std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
2755 IT accnz = std::accumulate(colnz, colnz+colneighs, 0);
2757 std::vector<IT> indacc(accnz);
2758 std::vector<NT> numacc(accnz);
2759 MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz, dpls, MPIType<IT>(), ColWorld);
2760 MPI_Allgatherv(trxnums.data(), trxlocnz, MPIType<NT>(), numacc.data(), colnz, dpls, MPIType<NT>(), ColWorld);
2768 spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace);
2773 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace), commGrid);
2780template <
class IT,
class NT,
class DER>
2783 if(*commGrid == *rhs.commGrid)
2785 spSeq->EWiseMult(*(rhs.spSeq), exclude);
2789 std::cout <<
"Grids are not comparable, EWiseMult() fails !" << std::endl;
2801template <
class IT,
class NT,
class DER>
2804 if(*commGrid == *rhs.commGrid)
2806 spSeq->SetDifference(*(rhs.spSeq));
2810 std::cout <<
"Grids are not comparable, SetDifference() fails !" << std::endl;
2816template <
class IT,
class NT,
class DER>
2819 if(*commGrid == *rhs.commGrid)
2821 spSeq->EWiseScale(rhs.array, rhs.m, rhs.n);
2825 std::cout <<
"Grids are not comparable, EWiseScale() fails !" << std::endl;
2830template <
class IT,
class NT,
class DER>
2831template <
typename _BinaryOperation>
2834 if(*commGrid == *rhs.commGrid)
2836 if(getlocalrows() == rhs.m && getlocalcols() == rhs.n)
2838 spSeq->UpdateDense(rhs.array, __binary_op);
2842 std::cout <<
"Matrices have different dimensions, UpdateDense() fails !" << std::endl;
2848 std::cout <<
"Grids are not comparable, UpdateDense() fails !" << std::endl;
2853template <
class IT,
class NT,
class DER>
2860 if (commGrid->myrank == 0)
2861 std::cout <<
"As a whole: " << mm <<
" rows and "<< nn <<
" columns and "<< nznz <<
" nonzeros" << std::endl;
2864 IT allprocs = commGrid->grrows * commGrid->grcols;
2865 for(
IT i=0; i< allprocs; ++i)
2867 if (commGrid->myrank == i)
2869 std::cout <<
"Processor (" << commGrid->GetRankInProcRow() <<
"," << commGrid->GetRankInProcCol() <<
")'s data: " << std::endl;
2872 MPI_Barrier(commGrid->GetWorld());
2877template <
class IT,
class NT,
class DER>
2880 int local =
static_cast<int>((*spSeq) == (*(rhs.spSeq)));
2882 MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
2883 return static_cast<bool>(whole);
2891template <
class IT,
class NT,
class DER>
2892template <
typename _BinaryOperation,
typename LIT>
2897 int nprocs = commGrid->GetSize();
2898 int * sendcnt =
new int[
nprocs];
2899 int * recvcnt =
new int[
nprocs];
2900 for(
int i=0; i<
nprocs; ++i)
2901 sendcnt[i] = data[i].
size();
2903 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
2904 int * sdispls =
new int[
nprocs]();
2905 int * rdispls =
new int[
nprocs]();
2906 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdispls+1);
2907 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdispls+1);
2908 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
2909 IT totsent = std::accumulate(sendcnt,sendcnt+
nprocs,
static_cast<IT>(0));
2911 assert((totsent < std::numeric_limits<int>::max()));
2912 assert((totrecv < std::numeric_limits<int>::max()));
2917 commGrid->OpenDebugFile(
"Displacements", oput);
2918 copy(sdispls, sdispls+
nprocs, ostream_iterator<int>(oput,
" ")); oput << endl;
2919 copy(rdispls, rdispls+
nprocs, ostream_iterator<int>(oput,
" ")); oput << endl;
2923 if(commGrid->GetRank() == 0) gsizes =
new IT[
nprocs];
2924 MPI_Gather(&totrecv, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2925 if(commGrid->GetRank() == 0) { std::copy(gsizes, gsizes+
nprocs, std::ostream_iterator<IT>(std::cout,
" ")); std::cout << std::endl; }
2926 MPI_Barrier(commGrid->GetWorld());
2927 MPI_Gather(&totsent, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2928 if(commGrid->GetRank() == 0) { copy(gsizes, gsizes+
nprocs, ostream_iterator<IT>(cout,
" ")); cout << endl; }
2929 MPI_Barrier(commGrid->GetWorld());
2930 if(commGrid->GetRank() == 0)
delete [] gsizes;
2933 std::tuple<LIT,LIT,NT> * senddata =
new std::tuple<LIT,LIT,NT>[locsize];
2934 for(
int i=0; i<
nprocs; ++i)
2936 std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
2938 data[i].shrink_to_fit();
2940 MPI_Datatype MPI_triple;
2941 MPI_Type_contiguous(
sizeof(std::tuple<LIT,LIT,NT>), MPI_CHAR, &MPI_triple);
2942 MPI_Type_commit(&MPI_triple);
2944 std::tuple<LIT,LIT,NT> * recvdata =
new std::tuple<LIT,LIT,NT>[totrecv];
2945 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
2947 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2948 MPI_Type_free(&MPI_triple);
2950 int r = commGrid->GetGridRows();
2951 int s = commGrid->GetGridCols();
2952 IT m_perproc = total_m / r;
2953 IT n_perproc = total_n / s;
2954 int myprocrow = commGrid->GetRankInProcCol();
2955 int myproccol = commGrid->GetRankInProcRow();
2956 IT locrows, loccols;
2957 if(myprocrow != r-1) locrows = m_perproc;
2958 else locrows = total_m - myprocrow * m_perproc;
2959 if(myproccol != s-1) loccols = n_perproc;
2960 else loccols = total_n - myproccol * n_perproc;
2965 A.RemoveDuplicates(BinOp);
2967 spSeq =
new DER(
A,
false);
2972template <
class IT,
class NT,
class DER>
2973std::vector<std::vector<SpParMat<IT, NT, DER>>>
2976 IT g_nr = this->getnrow();
2977 IT g_nc = this->getncol();
2979 if (br == 1 && bc == 1 || (br > g_nr || bc > g_nc))
2980 return std::vector<std::vector<SpParMat<IT, NT, DER>>>
2981 (1, std::vector<SpParMat<IT, NT, DER>>(1, *
this));
2983 int np = commGrid->GetSize();
2984 int rank = commGrid->GetRank();
2986 std::vector<std::vector<SpParMat<IT, NT, DER>>>
2990 std::vector<std::vector<std::vector<std::vector<std::tuple<IT, IT, NT>>>>>
2992 std::vector<std::vector<std::vector<std::tuple<IT, IT, NT>>>>
2993 (bc, std::vector<std::vector<std::tuple<IT, IT, NT>>>
2994 (np, std::vector<std::tuple<IT, IT, NT>>())));
2996 assert(spSeq != NULL);
2999 IT g_rbeg = (g_nr/commGrid->GetGridRows()) * commGrid->GetRankInProcCol();
3000 IT g_cbeg = (g_nc/commGrid->GetGridCols()) * commGrid->GetRankInProcRow();
3001 IT br_sz = g_nr / br;
3002 IT br_r = g_nr % br;
3003 IT bc_sz = g_nc / bc;
3004 IT bc_r = g_nc % bc;
3006 std::vector<IT> br_sizes(br, br_sz);
3007 std::vector<IT> bc_sizes(bc, bc_sz);
3008 for (
IT i = 0; i < br_r; ++i)
3010 for (
IT i = 0; i < bc_r; ++i)
3013 auto get_block = [](
IT x,
IT sz,
IT r,
IT &bid,
IT &idx)
3034 IT rbid, ridx, ridx_l, cbid, cidx, cidx_l;
3035 get_block(g_ridx, br_sz, br_r, rbid, ridx);
3036 get_block(g_cidx, bc_sz, bc_r, cbid, cidx);
3037 int owner = Owner(br_sizes[rbid], bc_sizes[cbid], ridx, cidx,
3040 btuples[rbid][cbid][owner].push_back({ridx_l, cidx_l, tuples.
numvalue(i)});
3045 for (
int i = 0; i < br; ++i)
3047 for (
int j = 0; j < bc; ++j)
3050 for (
auto &el : btuples[i][j])
3051 locsize += el.size();
3053 auto &M = bmats[i][j];
3054 M.SparseCommon(btuples[i][j], locsize, br_sizes[i], bc_sizes[j],
3065template <
class IT,
class NT,
class DER>
3069 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
3074 if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
3080 commGrid = distrows.commGrid;
3081 int nprocs = commGrid->GetSize();
3082 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(
nprocs);
3085 for(
IT i=0; i<locsize; ++i)
3088 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
3089 data[owner].push_back(std::make_tuple(lrow,lcol,distvals.arr[i]));
3093 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
3097 SparseCommon(data, locsize, total_m, total_n,
maximum<NT>());
3103template <
class IT,
class NT,
class DER>
3107 if((*(distrows.commGrid) != *(distcols.commGrid)) )
3112 if((distrows.TotalLength() != distcols.TotalLength()) )
3117 commGrid = distrows.commGrid;
3118 int nprocs = commGrid->GetSize();
3119 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(
nprocs);
3122 for(
IT i=0; i<locsize; ++i)
3125 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
3126 data[owner].push_back(std::make_tuple(lrow,lcol,val));
3130 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
3134 SparseCommon(data, locsize, total_m, total_n,
maximum<NT>());
3138template <
class IT,
class NT,
class DER>
3139template <
class DELIT>
3143 typedef typename DER::LocalIT LIT;
3145 int nprocs = commGrid->GetSize();
3146 int gridrows = commGrid->GetGridRows();
3147 int gridcols = commGrid->GetGridCols();
3148 std::vector< std::vector<LIT> > data(
nprocs);
3153 if(
sizeof(LIT) <
sizeof(DELIT))
3155 std::ostringstream outs;
3156 outs <<
"Warning: Using smaller indices for the matrix than DistEdgeList\n";
3157 outs <<
"Local matrices are " << m_perproc <<
"-by-" << n_perproc << std::endl;
3164 int64_t perstage = DEL.nedges / stages;
3166 std::vector<LIT> alledges;
3168 for(LIT s=0; s< stages; ++s)
3171 int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
3178 for (
int64_t i = n_befor; i < n_after; i++)
3180 int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
3181 int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
3183 if(fr >= 0 && to >= 0)
3187 data[owner].push_back(lrow);
3188 data[owner].push_back(lcol);
3195 for (
int64_t i = n_befor; i < n_after; i++)
3197 if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0)
3200 int owner = Owner(DEL.
getGlobalV(), DEL.
getGlobalV(), DEL.edges[2*i+0], DEL.edges[2*i+1], lrow, lcol);
3201 data[owner].push_back(lrow);
3202 data[owner].push_back(lcol);
3208 LIT * sendbuf =
new LIT[2*realedges];
3209 int * sendcnt =
new int[
nprocs];
3210 int * sdispls =
new int[
nprocs];
3211 for(
int i=0; i<
nprocs; ++i)
3212 sendcnt[i] = data[i].
size();
3214 int * rdispls =
new int[
nprocs];
3215 int * recvcnt =
new int[
nprocs];
3216 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld());
3220 for(
int i=0; i<
nprocs-1; ++i)
3222 sdispls[i+1] = sdispls[i] + sendcnt[i];
3223 rdispls[i+1] = rdispls[i] + recvcnt[i];
3225 for(
int i=0; i<
nprocs; ++i)
3226 std::copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
3229 for(
int i=0; i<
nprocs; ++i)
3230 std::vector<LIT>().swap(data[i]);
3234 IT thisrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
3235 LIT * recvbuf =
new LIT[thisrecv];
3236 totrecv += thisrecv;
3238 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
3239 DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
3240 std::copy (recvbuf,recvbuf+thisrecv,std::back_inserter(alledges));
3244 int myprocrow = commGrid->GetRankInProcCol();
3245 int myproccol = commGrid->GetRankInProcRow();
3246 LIT locrows, loccols;
3247 if(myprocrow != gridrows-1) locrows = m_perproc;
3248 else locrows = DEL.
getGlobalV() - myprocrow * m_perproc;
3249 if(myproccol != gridcols-1) loccols = n_perproc;
3250 else loccols = DEL.
getGlobalV() - myproccol * n_perproc;
3253 spSeq =
new DER(
A,
false);
3256template <
class IT,
class NT,
class DER>
3259 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3262 if(DiagWorld != MPI_COMM_NULL)
3264 typedef typename DER::LocalIT LIT;
3268 spSeq =
new DER(tuples,
false);
3270 MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
3276template <
class IT,
class NT,
class DER>
3279 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3280 if(DiagWorld != MPI_COMM_NULL)
3282 typedef typename DER::LocalIT LIT;
3285 tuples.
AddLoops(loopval, replaceExisting);
3287 spSeq =
new DER(tuples,
false);
3293template <
class IT,
class NT,
class DER>
3298 if(*loopvals.commGrid != *commGrid)
3300 SpParHelper::Print(
"Grids are not comparable, SpParMat::AddLoops() fails!\n", commGrid->GetWorld());
3303 if (getncol()!= loopvals.TotalLength())
3305 SpParHelper::Print(
"The number of entries in loopvals is not equal to the number of diagonal entries.\n");
3311 int rowProcs = commGrid->GetGridCols();
3312 std::vector<int> recvcnt(rowProcs, 0);
3313 std::vector<int> rdpls(rowProcs, 0);
3314 MPI_Gather(&locsize, 1, MPI_INT, recvcnt.data(), 1, MPI_INT, commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
3315 std::partial_sum(recvcnt.data(), recvcnt.data()+rowProcs-1, rdpls.data()+1);
3317 IT totrecv = rdpls[rowProcs-1] + recvcnt[rowProcs-1];
3318 assert((totrecv < std::numeric_limits<int>::max()));
3320 std::vector<NT> rowvals(totrecv);
3321 MPI_Gatherv(loopvals.arr.data(), locsize, MPIType<NT>(), rowvals.data(), recvcnt.data(), rdpls.data(),
3322 MPIType<NT>(), commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
3325 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3326 if(DiagWorld != MPI_COMM_NULL)
3328 typedef typename DER::LocalIT LIT;
3331 tuples.
AddLoops(rowvals, replaceExisting);
3333 spSeq =
new DER(tuples,
false);
3341template <
class IT,
class NT,
class DER>
3342template <
typename LIT,
typename OT>
3345 if(spSeq->getnsplit() > 0)
3347 SpParHelper::Print(
"Can not declare preallocated buffers for multithreaded execution\n", commGrid->GetWorld());
3351 typedef typename DER::LocalIT LocIT;
3354 LocIT mA = spSeq->getnrow();
3355 LocIT nA = spSeq->getncol();
3357 int p_c = commGrid->GetGridCols();
3358 int p_r = commGrid->GetGridRows();
3360 LocIT rwperproc = mA / p_c;
3361 LocIT cwperproc = nA / p_r;
3364 LocIT * colinds =
seq->GetDCSC()->jc;
3365 LocIT locnzc =
seq->getnzc();
3367 int * gsizes = NULL;
3370 std::vector<LocIT> pack2send;
3375 for(
int pid = 1; pid <= p_r; pid++)
3379 IT offset = dummyRHS.RowLenUntil(pid-1);
3380 int diagneigh = commGrid->GetComplementRank();
3381 MPI_Sendrecv(&offset, 1, MPIType<IT>(), diagneigh,
TRTAGNZ, &diagoffset, 1, MPIType<IT>(), diagneigh,
TRTAGNZ, commGrid->GetWorld(), &status);
3383 LocIT endind = (pid == p_r)? nA :
static_cast<LocIT
>(pid) * cwperproc;
3384 while(cci < locnzc && colinds[cci] < endind)
3386 pack2send.push_back(colinds[cci++]-diagoffset);
3388 if(pid-1 == myrank) gsizes =
new int[p_r];
3389 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, pid-1, commGrid->GetColWorld());
3392 IT colcnt = std::accumulate(gsizes, gsizes+p_r,
static_cast<IT>(0));
3393 recvbuf =
new IT[colcnt];
3394 dpls =
new IT[p_r]();
3395 std::partial_sum(gsizes, gsizes+p_r-1, dpls+1);
3399 MPI_Gatherv(
SpHelper::p2a(pack2send), mysize, MPIType<LocIT>(), recvbuf, gsizes, dpls, MPIType<LocIT>(), pid-1, commGrid->GetColWorld());
3400 std::vector<LocIT>().swap(pack2send);
3404 recveclen = dummyRHS.MyLocLength();
3405 std::vector< std::vector<LocIT> > service(recveclen);
3406 for(
int i=0; i< p_r; ++i)
3408 for(
int j=0; j< gsizes[i]; ++j)
3410 IT colid2update = recvbuf[dpls[i]+j];
3411 if(service[colid2update].
size() < GATHERVNEIGHLIMIT)
3413 service.push_back(i);
3423 std::vector<bool> isthere(mA,
false);
3424 std::vector<int> maxlens(p_c,0);
3426 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
3428 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3430 LocIT rowid = nzit.rowid();
3433 LocIT owner = std::min(nzit.rowid() / rwperproc, (LocIT) p_c-1);
3435 isthere[rowid] =
true;
3440 optbuf.
Set(maxlens,mA);
3443template <
class IT,
class NT,
class DER>
3446 spSeq->RowSplit(numsplits);
3454template <
class IT,
class NT,
class DER>
3455template <
typename SR>
3459 std::shared_ptr<CommGrid> Grid =
ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
3461 typedef typename DER::LocalIT LIT;
3463 LIT AA_m = spSeq->getnrow();
3464 LIT AA_n = spSeq->getncol();
3466 DER seqTrn = spSeq->TransposeConst();
3468 MPI_Barrier(commGrid->GetWorld());
3470 LIT ** NRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
3471 LIT ** TRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
3479 std::vector< SpTuples<LIT,NT> *> tomerge;
3481 int Nself = commGrid->GetRankInProcRow();
3482 int Tself = commGrid->GetRankInProcCol();
3484 for(
int i = 0; i < stages; ++i)
3486 std::vector<LIT> ess;
3487 if(i == Nself) NRecv = spSeq;
3490 ess.resize(DER::esscount);
3491 for(
int j=0; j< DER::esscount; ++j)
3492 ess[j] = NRecvSizes[j][i];
3499 if(i == Tself) TRecv = &seqTrn;
3502 ess.resize(DER::esscount);
3503 for(
int j=0; j< DER::esscount; ++j)
3504 ess[j] = TRecvSizes[j][i];
3509 SpTuples<LIT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv,
false,
true);
3511 tomerge.push_back(AA_cont);
3513 if(i != Nself)
delete NRecv;
3514 if(i != Tself)
delete TRecv;
3521 spSeq =
new DER(MergeAll<SR>(tomerge, AA_m, AA_n),
false);
3522 for(
unsigned int i=0; i<tomerge.size(); ++i)
3527template <
class IT,
class NT,
class DER>
3530 if(commGrid->myproccol == commGrid->myprocrow)
3536 typedef typename DER::LocalIT LIT;
3538 LIT locnnz = Atuples.
getnnz();
3539 LIT * rows =
new LIT[locnnz];
3540 LIT * cols =
new LIT[locnnz];
3541 NT * vals =
new NT[locnnz];
3542 for(LIT i=0; i < locnnz; ++i)
3548 LIT locm = getlocalcols();
3549 LIT locn = getlocalrows();
3552 LIT remotem, remoten, remotennz;
3553 std::swap(locm,locn);
3554 int diagneigh = commGrid->GetComplementRank();
3557 MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh,
TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh,
TRTAGNZ, commGrid->GetWorld(), &status);
3558 MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh,
TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh,
TRTAGM, commGrid->GetWorld(), &status);
3559 MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh,
TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh,
TRTAGN, commGrid->GetWorld(), &status);
3561 LIT * rowsrecv =
new LIT[remotennz];
3562 MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh,
TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh,
TRTAGROWS, commGrid->GetWorld(), &status);
3565 LIT * colsrecv =
new LIT[remotennz];
3566 MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh,
TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh,
TRTAGCOLS, commGrid->GetWorld(), &status);
3569 NT * valsrecv =
new NT[remotennz];
3570 MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh,
TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh,
TRTAGVALS, commGrid->GetWorld(), &status);
3573 std::tuple<LIT,LIT,NT> * arrtuples =
new std::tuple<LIT,LIT,NT>[remotennz];
3574 for(LIT i=0; i< remotennz; ++i)
3576 arrtuples[i] = std::make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
3578 DeleteAll(rowsrecv, colsrecv, valsrecv);
3580 sort(arrtuples , arrtuples+remotennz, collexicogcmp );
3583 spSeq->Create( remotennz, remotem, remoten, arrtuples);
3588template <
class IT,
class NT,
class DER>
3589template <
class HANDLER>
3592 int proccols = commGrid->GetGridCols();
3593 int procrows = commGrid->GetGridRows();
3594 IT totalm = getnrow();
3595 IT totaln = getncol();
3596 IT totnnz = getnnz();
3599 if(commGrid->GetRank() == 0)
3602 std::stringstream strm;
3603 strm <<
"%%MatrixMarket matrix coordinate real general" << std::endl;
3604 strm << totalm <<
" " << totaln <<
" " << totnnz << std::endl;
3606 out.open(filename.c_str(),std::ios_base::trunc);
3607 flinelen = s.length();
3608 out.write(s.c_str(), flinelen);
3611 int colrank = commGrid->GetRankInProcCol();
3612 int colneighs = commGrid->GetGridRows();
3613 IT * locnrows =
new IT[colneighs];
3614 locnrows[colrank] = (
IT) getlocalrows();
3615 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
3616 IT roffset = std::accumulate(locnrows, locnrows+colrank, 0);
3619 MPI_Datatype datatype;
3620 MPI_Type_contiguous(
sizeof(std::pair<IT,NT>), MPI_CHAR, &datatype);
3621 MPI_Type_commit(&datatype);
3623 for(
int i = 0; i < procrows; i++)
3625 if(commGrid->GetRankInProcCol() == i)
3627 IT localrows = spSeq->getnrow();
3628 std::vector< std::vector< std::pair<IT,NT> > > csr(localrows);
3629 if(commGrid->GetRankInProcRow() == 0)
3631 IT localcols = spSeq->getncol();
3632 MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3633 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
3635 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3637 csr[nzit.rowid()].push_back( std::make_pair(colit.colid(), nzit.value()) );
3644 MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3645 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
3646 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
3648 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3650 csr[nzit.rowid()].push_back( std::make_pair(colit.colid() + noffset, nzit.value()) );
3654 std::pair<IT,NT> * ents = NULL;
3655 int * gsizes = NULL, * dpls = NULL;
3656 if(commGrid->GetRankInProcRow() == 0)
3658 out.open(filename.c_str(),std::ios_base::app);
3659 gsizes =
new int[proccols];
3660 dpls =
new int[proccols]();
3662 for(
int j = 0; j < localrows; ++j)
3665 sort(csr[j].begin(), csr[j].end());
3666 int mysize = csr[j].size();
3667 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
3668 if(commGrid->GetRankInProcRow() == 0)
3670 rowcnt = std::accumulate(gsizes, gsizes+proccols,
static_cast<IT>(0));
3671 std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
3672 ents =
new std::pair<IT,NT>[rowcnt];
3677 MPI_Gatherv(
SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
3678 if(commGrid->GetRankInProcRow() == 0)
3680 for(
int k=0; k< rowcnt; ++k)
3685 out << j + roffset + 1 <<
"\t" << ents[k].first + 1 <<
"\t";
3688 out << ents[k].first + 1 <<
"\t" << j + roffset + 1 <<
"\t";
3689 handler.save(out, ents[k].second, j + roffset, ents[k].first);
3695 if(commGrid->GetRankInProcRow() == 0)
3701 MPI_Barrier(commGrid->GetWorld());
3708template <
class IT,
class NT,
class DER>
3712 int myrank = commGrid->GetRank();
3713 int nprocs = commGrid->GetSize();
3715 MPI_Offset fpos, end_fpos;
3717 if (stat(filename.c_str(), &st) == -1)
3719 MPI_Abort(MPI_COMM_WORLD,
NOFILE);
3721 int64_t file_size = st.st_size;
3724 std::cout <<
"File is " << file_size <<
" bytes" << std::endl;
3726 fpos = myrank * file_size /
nprocs;
3728 if(myrank != (
nprocs-1)) end_fpos = (myrank + 1) * file_size /
nprocs;
3729 else end_fpos = file_size;
3732 MPI_File_open (commGrid->commWorld,
const_cast<char*
>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3734 typedef std::map<std::string, uint64_t> KEYMAP;
3735 std::vector< KEYMAP > allkeys(
nprocs);
3737 std::vector<std::string> lines;
3739 int64_t entriesread = lines.size();
3746 entriesread += lines.size();
3750 MPI_Reduce(&entriesread, &allentriesread, 1,
MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3751#ifdef COMBBLAS_DEBUG
3753 std::cout <<
"Initial reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3756 int * sendcnt =
new int[
nprocs];
3757 int * recvcnt =
new int[
nprocs];
3758 for(
int i=0; i<
nprocs; ++i)
3759 sendcnt[i] = allkeys[i].
size();
3761 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
3762 int * sdispls =
new int[
nprocs]();
3763 int * rdispls =
new int[
nprocs]();
3764 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdispls+1);
3765 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdispls+1);
3766 totsend = std::accumulate(sendcnt,sendcnt+
nprocs,
static_cast<IT>(0));
3767 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
3769 assert((totsend < std::numeric_limits<int>::max()));
3770 assert((totrecv < std::numeric_limits<int>::max()));
3775 senddata =
new TYPE2SEND[totsend];
3777 #pragma omp parallel for
3778 for(
int i=0; i<
nprocs; ++i)
3781 for(
auto pobj:allkeys[i])
3784 std::array<char, MAXVERTNAME> vname;
3785 std::copy( pobj.first.begin(), pobj.first.end(), vname.begin() );
3786 if(pobj.first.length() <
MAXVERTNAME) vname[pobj.first.length()] =
'\0';
3788 senddata[sdispls[i]+j] = TYPE2SEND(vname, pobj.second);
3794 MPI_Datatype MPI_HASH;
3795 MPI_Type_contiguous(
sizeof(TYPE2SEND), MPI_CHAR, &MPI_HASH);
3796 MPI_Type_commit(&MPI_HASH);
3798 TYPE2SEND * recvdata =
new TYPE2SEND[totrecv];
3799 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_HASH, recvdata, recvcnt, rdispls, MPI_HASH, commGrid->GetWorld());
3802 std::set< std::pair<uint64_t, std::string> > uniqsorted;
3803 for(
IT i=0; i< totrecv; ++i)
3805 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(),
'\0');
3806 std::string strtmp(recvdata[i].first.begin(), locnull);
3808 uniqsorted.insert(std::make_pair(recvdata[i].second, strtmp));
3810 uint64_t uniqsize = uniqsorted.size();
3812#ifdef COMBBLAS_DEBUG
3814 std::cout <<
"out of " << totrecv <<
" vertices received, " << uniqsize <<
" were unique" << std::endl;
3818 MPI_Exscan( &uniqsize, &sizeuntil, 1,
MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld() );
3819 MPI_Allreduce(&uniqsize, &totallength, 1,
MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld());
3820 if(myrank == 0) sizeuntil = 0;
3822 distmapper = FullyDistVec<IT,STRASARRAY>(commGrid, totallength,STRASARRAY{});
3827 std::vector< std::vector< IT > > locs_send(
nprocs);
3828 std::vector< std::vector< std::string > > data_send(
nprocs);
3829 int * map_scnt =
new int[
nprocs]();
3830 for(
auto itr = uniqsorted.begin(); itr != uniqsorted.end(); ++itr)
3832 uint64_t globalindex = sizeuntil + locindex;
3833 invindex.insert(std::make_pair(itr->second, globalindex));
3836 int owner = distmapper.Owner(globalindex, newlocid);
3841 locs_send[owner].push_back(newlocid);
3842 data_send[owner].push_back(itr->second);
3853 for(
IT i=0; i< totrecv; ++i)
3855 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(),
'\0');
3856 std::string searchstr(recvdata[i].first.begin(), locnull);
3858 auto resp = invindex.find(searchstr);
3859 if (resp != invindex.end())
3861 recvdata[i].second = resp->second;
3864 std::cout <<
"Assertion failed at proc " << myrank <<
": the absence of the entry in invindex is unexpected!!!" << std::endl;
3866 MPI_Alltoallv(recvdata, recvcnt, rdispls, MPI_HASH, senddata, sendcnt, sdispls, MPI_HASH, commGrid->GetWorld());
3867 DeleteAll(recvdata, sendcnt, recvcnt, sdispls, rdispls);
3868 MPI_Type_free(&MPI_HASH);
3880template <
class IT,
class NT,
class DER>
3881template <
typename _BinaryOperation>
3884 int myrank = commGrid->GetRank();
3885 int nprocs = commGrid->GetSize();
3886 TYPE2SEND * senddata;
3891 MPI_File mpi_fh = TupleRead1stPassNExchange(filename, senddata, totsend, distmapper, totallength);
3893 typedef std::map<std::string, uint64_t> KEYMAP;
3894 KEYMAP ultimateperm;
3895 for(
IT i=0; i< totsend; ++i)
3897 auto locnull = std::find(senddata[i].first.begin(), senddata[i].first.end(),
'\0');
3899 std::string searchstr(senddata[i].first.begin(), locnull);
3900 auto ret = ultimateperm.emplace(std::make_pair(searchstr, senddata[i].second));
3904 std::cout <<
"the duplication in ultimateperm is unexpected!!!" << std::endl;
3910 MPI_Offset fpos, end_fpos;
3912 if (stat(filename.c_str(), &st) == -1)
3914 MPI_Abort(MPI_COMM_WORLD,
NOFILE);
3916 int64_t file_size = st.st_size;
3918 fpos = myrank * file_size /
nprocs;
3920 if(myrank != (
nprocs-1)) end_fpos = (myrank + 1) * file_size /
nprocs;
3921 else end_fpos = file_size;
3923 typedef typename DER::LocalIT LIT;
3924 std::vector<LIT> rows;
3925 std::vector<LIT> cols;
3926 std::vector<NT> vals;
3928 std::vector<std::string> lines;
3930 int64_t entriesread = lines.size();
3937 entriesread += lines.size();
3941 MPI_Reduce(&entriesread, &allentriesread, 1,
MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3942#ifdef COMBBLAS_DEBUG
3944 std::cout <<
"Second reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3947 MPI_File_close(&mpi_fh);
3948 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(
nprocs);
3950 LIT locsize = rows.size();
3951 for(LIT i=0; i<locsize; ++i)
3954 int owner = Owner(totallength, totallength, rows[i], cols[i], lrow, lcol);
3955 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3957 std::vector<LIT>().swap(rows);
3958 std::vector<LIT>().swap(cols);
3959 std::vector<NT>().swap(vals);
3961#ifdef COMBBLAS_DEBUG
3963 std::cout <<
"Packing to recipients finished, about to send..." << std::endl;
3966 if(spSeq)
delete spSeq;
3967 SparseCommon(data, locsize, totallength, totallength, BinOp);
3978template <
class IT,
class NT,
class DER>
3979template <
typename _BinaryOperation>
3984 int64_t nrows, ncols, nonzeros;
3988 int myrank = commGrid->GetRank();
3989 int nprocs = commGrid->GetSize();
3993 if ((f = fopen(filename.c_str(),
"r")) == NULL)
3995 printf(
"COMBBLAS: Matrix-market file %s can not be found\n", filename.c_str());
3996 MPI_Abort(MPI_COMM_WORLD,
NOFILE);
4000 printf(
"Could not process Matrix Market banner.\n");
4007 printf(
"Sorry, this application does not support complext types");
4012 std::cout <<
"Matrix is Float" << std::endl;
4017 std::cout <<
"Matrix is Integer" << std::endl;
4022 std::cout <<
"Matrix is Boolean" << std::endl;
4027 std::cout <<
"Matrix is symmetric" << std::endl;
4034 std::cout <<
"Total number of nonzeros expected across all processors is " << nonzeros << std::endl;
4037 MPI_Bcast(&type, 1, MPI_INT, 0, commGrid->commWorld);
4038 MPI_Bcast(&symmetric, 1, MPI_INT, 0, commGrid->commWorld);
4045 if (stat(filename.c_str(), &st) == -1)
4047 MPI_Abort(MPI_COMM_WORLD,
NOFILE);
4049 int64_t file_size = st.st_size;
4050 MPI_Offset fpos, end_fpos, endofheader;
4051 if(commGrid->GetRank() == 0)
4053 std::cout <<
"File is " << file_size <<
" bytes" << std::endl;
4056 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
4061 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
4062 fpos = endofheader + myrank * (file_size-endofheader) /
nprocs;
4064 if(myrank != (
nprocs-1)) end_fpos = endofheader + (myrank + 1) * (file_size-endofheader) /
nprocs;
4065 else end_fpos = file_size;
4068 MPI_File_open (commGrid->commWorld,
const_cast<char*
>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
4071 typedef typename DER::LocalIT LIT;
4072 std::vector<LIT> rows;
4073 std::vector<LIT> cols;
4074 std::vector<NT> vals;
4076 std::vector<std::string> lines;
4078 int64_t entriesread = lines.size();
4080 MPI_Barrier(commGrid->commWorld);
4085 entriesread += lines.size();
4089 MPI_Reduce(&entriesread, &allentriesread, 1,
MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
4090#ifdef COMBBLAS_DEBUG
4092 std::cout <<
"Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
4095 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(
nprocs);
4097 LIT locsize = rows.size();
4098 for(LIT i=0; i<locsize; ++i)
4101 int owner = Owner(nrows, ncols, rows[i], cols[i], lrow, lcol);
4102 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
4104 std::vector<LIT>().swap(rows);
4105 std::vector<LIT>().swap(cols);
4106 std::vector<NT>().swap(vals);
4108#ifdef COMBBLAS_DEBUG
4110 std::cout <<
"Packing to recepients finished, about to send..." << std::endl;
4113 if(spSeq)
delete spSeq;
4114 SparseCommon(data, locsize, nrows, ncols, BinOp);
4118template <
class IT,
class NT,
class DER>
4119template <
class HANDLER>
4122 int myrank = commGrid->GetRank();
4123 int nprocs = commGrid->GetSize();
4124 IT totalm = getnrow();
4125 IT totaln = getncol();
4126 IT totnnz = getnnz();
4128 std::stringstream ss;
4131 ss <<
"%%MatrixMarket matrix coordinate real general" << std::endl;
4132 ss << totalm <<
" " << totaln <<
" " << totnnz << std::endl;
4135 IT entries = getlocalnnz();
4137 MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
4138 if(myrank == 0) sizeuntil = 0;
4142 GetPlaceInGlobalGrid(roffset, coffset);
4149 for(
typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
4151 for(
typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
4153 IT glrowid = nzit.rowid() + roffset;
4154 IT glcolid = colit.colid() + coffset;
4155 ss << glrowid <<
'\t';
4156 ss << glcolid <<
'\t';
4157 handler.save(ss, nzit.value(), glrowid, glcolid);
4161 std::string text = ss.str();
4164 bytes[myrank] = text.size();
4166 int64_t bytesuntil = std::accumulate(bytes, bytes+myrank,
static_cast<int64_t>(0));
4171 MPI_File_open(commGrid->GetWorld(), (
char*) filename.c_str(), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile) ;
4172 int mpi_err = MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (
char*)
"external32", MPI_INFO_NULL);
4173 if (mpi_err == 51) {
4175 MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (
char*)
"native", MPI_INFO_NULL);
4178 int64_t batchSize = 256 * 1024 * 1024;
4179 size_t localfileptr = 0;
4180 int64_t remaining = bytes[myrank];
4181 int64_t totalremaining = bytestotal;
4183 while(totalremaining > 0)
4185 #ifdef COMBBLAS_DEBUG
4187 std::cout <<
"Remaining " << totalremaining <<
" bytes to write in aggregate" << std::endl;
4190 int curBatch = std::min(batchSize, remaining);
4191 MPI_File_write_all(thefile, text.c_str()+localfileptr, curBatch, MPI_CHAR, &status);
4193 MPI_Get_count(&status, MPI_CHAR, &count);
4194 assert( (curBatch == 0) || (count == curBatch) );
4195 localfileptr += curBatch;
4196 remaining -= curBatch;
4197 MPI_Allreduce(&remaining, &totalremaining, 1,
MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
4199 MPI_File_close(&thefile);
4209template <
class IT,
class NT,
class DER>
4210template <
class HANDLER>
4214 TAU_PROFILE_TIMER(rdtimer,
"ReadDistribute",
"void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
4215 TAU_PROFILE_START(rdtimer);
4218 std::ifstream infile;
4219 FILE * binfile = NULL;
4222 if(commGrid->GetRank() == master)
4224 hfile =
ParseHeader(filename, binfile, seeklength);
4226 MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
4228 IT total_m, total_n, total_nnz;
4229 IT m_perproc = 0, n_perproc = 0;
4231 int colneighs = commGrid->GetGridRows();
4232 int rowneighs = commGrid->GetGridCols();
4242 buffpercolneigh /= colneighs;
4244 SpParHelper::Print(
"COMBBLAS: Parallel I/O requested but binary header is corrupted\n", commGrid->GetWorld());
4250 buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
4251 if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > std::numeric_limits<int>::max())
4253 SpParHelper::Print(
"COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n", commGrid->GetWorld());
4256 int * cdispls =
new int[colneighs];
4257 for (
IT i=0; i<colneighs; ++i) cdispls[i] = i*buffpercolneigh;
4258 int * rdispls =
new int[rowneighs];
4259 for (
IT i=0; i<rowneighs; ++i) rdispls[i] = i*buffperrowneigh;
4261 int *ccurptrs = NULL, *rcurptrs = NULL;
4268 int rankincol = commGrid->GetRankInProcCol(master);
4269 int rankinrow = commGrid->GetRankInProcRow(master);
4270 std::vector< std::tuple<IT, IT, NT> > localtuples;
4272 if(commGrid->GetRank() == master)
4277 total_n = 0; total_m = 0;
4278 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4283 SpParHelper::Print(
"COMBBLAS: Ascii input with binary headers is not supported\n", commGrid->GetWorld());
4284 total_n = 0; total_m = 0;
4285 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4290 infile.open(filename.c_str());
4292 infile.getline(comment,256);
4293 while(comment[0] ==
'%')
4295 infile.getline(comment,256);
4297 std::stringstream ss;
4298 ss << std::string(comment);
4299 ss >> total_m >> total_n >> total_nnz;
4302 SpParHelper::Print(
"COMBBLAS: Trying to read binary headerless file in parallel, aborting\n", commGrid->GetWorld());
4303 total_n = 0; total_m = 0;
4304 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4312 total_nnz = hfile.
nnz;
4314 m_perproc = total_m / colneighs;
4315 n_perproc = total_n / rowneighs;
4316 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4317 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
4319 if(seeklength > 0 && pario)
4321 IT entriestoread = total_nnz / colneighs;
4324 commGrid->OpenDebugFile(
"Read", oput);
4325 oput <<
"Total nnz: " << total_nnz <<
" entries to read: " << entriestoread << std::endl;
4328 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4329 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
4333 IT temprow, tempcol;
4335 IT ntrow = 0, ntcol = 0;
4337 bool nonumline = nonum;
4339 for(; cnz < total_nnz; ++cnz)
4343 std::stringstream linestream;
4347 infile.getline(line, 1024);
4349 linestream >> temprow >> tempcol;
4353 linestream >> std::skipws;
4354 nonumline = linestream.eof();
4363 handler.binaryfill(binfile, temprow , tempcol, tempval);
4371 colrec = std::min(
static_cast<int>(temprow / m_perproc), colneighs-1);
4372 commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
4374 rows[ commonindex ] = temprow;
4375 cols[ commonindex ] = tempcol;
4378 vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol);
4382 vals[ commonindex ] = tempval;
4384 ++ (ccurptrs[colrec]);
4385 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) )
4387 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
4390 IT * temprows =
new IT[recvcount];
4391 IT * tempcols =
new IT[recvcount];
4392 NT * tempvals =
new NT[recvcount];
4395 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4396 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4397 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
4399 std::fill_n(ccurptrs, colneighs, 0);
4402 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
4403 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
4405 if( cnz != (total_nnz-1) )
4408 rows =
new IT [ buffpercolneigh * colneighs ];
4409 cols =
new IT [ buffpercolneigh * colneighs ];
4410 vals =
new NT [ buffpercolneigh * colneighs ];
4414 assert (cnz == total_nnz);
4417 std::fill_n(ccurptrs, colneighs, std::numeric_limits<int>::max());
4418 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
4421 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4422 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4425 else if( commGrid->OnSameProcCol(master) )
4427 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4428 m_perproc = total_m / colneighs;
4429 n_perproc = total_n / rowneighs;
4431 if(seeklength > 0 && pario)
4433 binfile = fopen(filename.c_str(),
"rb");
4434 IT entrysize = handler.entrylength();
4435 int myrankincol = commGrid->GetRankInProcCol();
4436 IT perreader = total_nnz / colneighs;
4437 IT read_offset = entrysize *
static_cast<IT>(myrankincol) * perreader + seeklength;
4438 IT entriestoread = perreader;
4439 if (myrankincol == colneighs-1)
4440 entriestoread = total_nnz -
static_cast<IT>(myrankincol) * perreader;
4441 fseek(binfile, read_offset, SEEK_SET);
4445 commGrid->OpenDebugFile(
"Read", oput);
4446 oput <<
"Total nnz: " << total_nnz <<
" OFFSET : " << read_offset <<
" entries to read: " << entriestoread << std::endl;
4450 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
4451 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4452 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
4456 while(total_n > 0 || total_m > 0)
4466 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
4467 if( recvcount == std::numeric_limits<int>::max())
break;
4470 IT * temprows =
new IT[recvcount];
4471 IT * tempcols =
new IT[recvcount];
4472 NT * tempvals =
new NT[recvcount];
4475 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4476 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4477 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
4480 rcurptrs =
new int[rowneighs];
4481 std::fill_n(rcurptrs, rowneighs, 0);
4484 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
4485 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
4490 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4491 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4496 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4498 m_perproc = total_m / colneighs;
4499 n_perproc = total_n / rowneighs;
4500 while(total_n > 0 || total_m > 0)
4503 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4504 if( recvcount == std::numeric_limits<int>::max())
4509 commGrid->OpenDebugFile(
"Read", oput);
4510 oput <<
"Total nnz: " << total_nnz <<
" total_m : " << total_m <<
" recvcount: " << recvcount << std::endl;
4515 IT * temprows =
new IT[recvcount];
4516 IT * tempcols =
new IT[recvcount];
4517 NT * tempvals =
new NT[recvcount];
4519 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4520 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4521 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4524 IT moffset = commGrid->myprocrow * m_perproc;
4525 IT noffset = commGrid->myproccol * n_perproc;
4527 for(
IT i=0; i< recvcount; ++i)
4529 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4535 std::tuple<IT,IT,NT> * arrtuples =
new std::tuple<IT,IT,NT>[localtuples.size()];
4536 std::copy(localtuples.begin(), localtuples.end(), arrtuples);
4538 IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
4539 IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
4540 spSeq->Create( localtuples.size(), localm, localn, arrtuples);
4543 TAU_PROFILE_STOP(rdtimer);
4548template <
class IT,
class NT,
class DER>
4552 rows =
new IT [ buffpercolneigh * colneighs ];
4553 cols =
new IT [ buffpercolneigh * colneighs ];
4554 vals =
new NT [ buffpercolneigh * colneighs ];
4556 ccurptrs =
new int[colneighs];
4557 rcurptrs =
new int[rowneighs];
4558 std::fill_n(ccurptrs, colneighs, 0);
4559 std::fill_n(rcurptrs, rowneighs, 0);
4562template <
class IT,
class NT,
class DER>
4563void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world,
IT & total_m,
IT & total_n,
IT & total_nnz,
int master)
4565 MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
4566 MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
4567 MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
4574template <
class IT,
class NT,
class DER>
4575void SpParMat<IT,NT,DER>::VerticalSend(
IT * & rows,
IT * & cols,
NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples,
int * rcurptrs,
int * ccurptrs,
int * rdispls,
int * cdispls,
4576 IT m_perproc,
IT n_perproc,
int rowneighs,
int colneighs,
IT buffperrowneigh,
IT buffpercolneigh,
int rankinrow)
4579 int * colrecvdispls =
new int[colneighs];
4580 int * colrecvcounts =
new int[colneighs];
4581 MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld);
4582 int totrecv = std::accumulate(colrecvcounts,colrecvcounts+colneighs,0);
4583 colrecvdispls[0] = 0;
4584 for(
int i=0; i<colneighs-1; ++i)
4585 colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
4588 IT * temprows =
new IT[totrecv];
4589 IT * tempcols =
new IT[totrecv];
4590 NT * tempvals =
new NT[totrecv];
4593 MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
4594 MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
4595 MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
4598 std::fill_n(ccurptrs, colneighs, 0);
4599 DeleteAll(colrecvdispls, colrecvcounts);
4603 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
4606 rows =
new IT [ buffpercolneigh * colneighs ];
4607 cols =
new IT [ buffpercolneigh * colneighs ];
4608 vals =
new NT [ buffpercolneigh * colneighs ];
4618template <
class IT,
class NT,
class DER>
4619template <
class HANDLER>
4620void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile,
IT * & rows,
IT * & cols,
NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples,
int * rcurptrs,
int * ccurptrs,
int * rdispls,
int * cdispls,
4621 IT m_perproc,
IT n_perproc,
int rowneighs,
int colneighs,
IT buffperrowneigh,
IT buffpercolneigh,
IT entriestoread, HANDLER handler,
int rankinrow,
bool transpose)
4623 assert(entriestoread != 0);
4625 IT temprow, tempcol;
4627 int finishedglobal = 1;
4628 while(cnz < entriestoread && !feof(binfile))
4630 handler.binaryfill(binfile, temprow , tempcol, tempval);
4638 int colrec = std::min(
static_cast<int>(temprow / m_perproc), colneighs-1);
4639 size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
4640 rows[ commonindex ] = temprow;
4641 cols[ commonindex ] = tempcol;
4642 vals[ commonindex ] = tempval;
4643 ++ (ccurptrs[colrec]);
4644 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) )
4648 commGrid->OpenDebugFile(
"Read", oput);
4649 oput <<
"To column neighbors: ";
4650 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput,
" ")); oput << std::endl;
4654 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4655 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4657 if(cnz == (entriestoread-1))
4659 int finishedlocal = 1;
4660 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4661 while(!finishedglobal)
4665 commGrid->OpenDebugFile(
"Read", oput);
4666 oput <<
"To column neighbors: ";
4667 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput,
" ")); oput << std::endl;
4673 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4674 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4676 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4681 int finishedlocal = 0;
4682 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4689 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4691 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4701template <
class IT,
class NT,
class DER>
4702void SpParMat<IT,NT,DER>::HorizontalSend(
IT * & rows,
IT * & cols,
NT * & vals,
IT * & temprows,
IT * & tempcols,
NT * & tempvals, std::vector < std::tuple <IT,IT,NT> > & localtuples,
4703 int * rcurptrs,
int * rdispls,
IT buffperrowneigh,
int rowneighs,
int recvcount,
IT m_perproc,
IT n_perproc,
int rankinrow)
4705 rows =
new IT [ buffperrowneigh * rowneighs ];
4706 cols =
new IT [ buffperrowneigh * rowneighs ];
4707 vals =
new NT [ buffperrowneigh * rowneighs ];
4710 for(
int i=0; i< recvcount; ++i)
4712 int rowrec = std::min(
static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
4713 rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
4714 cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
4715 vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
4716 ++ (rcurptrs[rowrec]);
4721 commGrid->OpenDebugFile(
"Read", oput);
4722 oput <<
"To row neighbors: ";
4723 std::copy(rcurptrs, rcurptrs+rowneighs, std::ostream_iterator<int>(oput,
" ")); oput << std::endl;
4724 oput <<
"Row displacements were: ";
4725 std::copy(rdispls, rdispls+rowneighs, std::ostream_iterator<int>(oput,
" ")); oput << std::endl;
4729 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4733 DeleteAll(temprows, tempcols, tempvals);
4734 temprows =
new IT[recvcount];
4735 tempcols =
new IT[recvcount];
4736 tempvals =
new NT[recvcount];
4739 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4740 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4741 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4744 IT moffset = commGrid->myprocrow * m_perproc;
4745 IT noffset = commGrid->myproccol * n_perproc;
4747 for(
int i=0; i< recvcount; ++i)
4749 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4752 std::fill_n(rcurptrs, rowneighs, 0);
4753 DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
4759template <
class IT,
class NT,
class DER>
4762 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
4767 IT globallen = getnnz();
4777 int rank = commGrid->GetRank();
4778 int nprocs = commGrid->GetSize();
4780 prelens[
rank] = prelen;
4781 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4782 IT prelenuntil = std::accumulate(prelens, prelens+
rank,
static_cast<IT>(0));
4784 int * sendcnt =
new int[
nprocs]();
4785 IT * rows =
new IT[prelen];
4786 IT * cols =
new IT[prelen];
4787 NT * vals =
new NT[prelen];
4789 int rowrank = commGrid->GetRankInProcRow();
4790 int colrank = commGrid->GetRankInProcCol();
4791 int rowneighs = commGrid->GetGridCols();
4792 int colneighs = commGrid->GetGridRows();
4793 IT * locnrows =
new IT[colneighs];
4794 IT * locncols =
new IT[rowneighs];
4795 locnrows[colrank] = getlocalrows();
4796 locncols[rowrank] = getlocalcols();
4798 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4799 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
4801 IT roffset = std::accumulate(locnrows, locnrows+colrank,
static_cast<IT>(0));
4802 IT coffset = std::accumulate(locncols, locncols+rowrank,
static_cast<IT>(0));
4805 for(
int i=0; i< prelen; ++i)
4808 int owner = nrows.Owner(prelenuntil+i, locid);
4811 rows[i] = Atuples.
rowindex(i) + roffset;
4812 cols[i] = Atuples.
colindex(i) + coffset;
4816 int * recvcnt =
new int[
nprocs];
4817 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
4819 int * sdpls =
new int[
nprocs]();
4820 int * rdpls =
new int[
nprocs]();
4821 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdpls+1);
4822 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdpls+1);
4824 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(),
SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4825 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(),
SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4826 MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(),
SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
4828 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4837template <
class IT,
class NT,
class DER>
4840 if((*(distrows.commGrid) != *(distcols.commGrid)) )
4845 IT globallen = getnnz();
4853 int rank = commGrid->GetRank();
4854 int nprocs = commGrid->GetSize();
4856 prelens[
rank] = prelen;
4857 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4858 IT prelenuntil = std::accumulate(prelens, prelens+
rank,
static_cast<IT>(0));
4860 int * sendcnt =
new int[
nprocs]();
4861 IT * rows =
new IT[prelen];
4862 IT * cols =
new IT[prelen];
4863 NT * vals =
new NT[prelen];
4865 int rowrank = commGrid->GetRankInProcRow();
4866 int colrank = commGrid->GetRankInProcCol();
4867 int rowneighs = commGrid->GetGridCols();
4868 int colneighs = commGrid->GetGridRows();
4869 IT * locnrows =
new IT[colneighs];
4870 IT * locncols =
new IT[rowneighs];
4871 locnrows[colrank] = getlocalrows();
4872 locncols[rowrank] = getlocalcols();
4874 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4875 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
4876 IT roffset = std::accumulate(locnrows, locnrows+colrank,
static_cast<IT>(0));
4877 IT coffset = std::accumulate(locncols, locncols+rowrank,
static_cast<IT>(0));
4880 for(
int i=0; i< prelen; ++i)
4883 int owner = nrows.Owner(prelenuntil+i, locid);
4886 rows[i] = Atuples.
rowindex(i) + roffset;
4887 cols[i] = Atuples.
colindex(i) + coffset;
4890 int * recvcnt =
new int[
nprocs];
4891 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
4893 int * sdpls =
new int[
nprocs]();
4894 int * rdpls =
new int[
nprocs]();
4895 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdpls+1);
4896 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdpls+1);
4898 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(),
SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4899 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(),
SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4901 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4907template <
class IT,
class NT,
class DER>
4910 int nprocs = commGrid->GetSize();
4911 int myrank = commGrid->GetRank();
4912 int nverts = getnrow();
4914 if (nverts != getncol()) {
4919 if (nverts != Assignments.TotalLength()) {
4920 SpParHelper::Print(
"Assignments vector length does not match number of vertices!\n");
4926 if (maxproc >=
static_cast<IT>(
nprocs)) {
4927 SpParHelper::Print(
"Assignments vector assigns to process not not in this group!\n");
4931 MPI_Comm World = commGrid->GetWorld();
4932 MPI_Comm RowWorld = commGrid->GetRowWorld();
4933 MPI_Comm ColWorld = commGrid->GetColWorld();
4935 int rowneighs, rowrank;
4936 MPI_Comm_size(RowWorld, &rowneighs);
4937 MPI_Comm_rank(RowWorld, &rowrank);
4941 std::vector<int> rowvecs_counts(rowneighs, 0);
4942 std::vector<int> rowvecs_displs(rowneighs, 0);
4944 rowvecs_counts[rowrank] = mylocsize;
4946 MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, rowvecs_counts.data(), 1, MPI_INT, RowWorld);
4950 std::partial_sum(rowvecs_counts.begin(), rowvecs_counts.end()-1, rowvecs_displs.begin()+1);
4951 size_t rowvecs_size = std::accumulate(rowvecs_counts.begin(), rowvecs_counts.end(),
static_cast<size_t>(0));
4953 std::vector<IT> rowvecs(rowvecs_size);
4955 MPI_Allgatherv(Assignments.
GetLocArr(), mylocsize, MPIType<IT>(), rowvecs.data(), rowvecs_counts.data(), rowvecs_displs.data(), MPIType<IT>(), RowWorld);
4957 int complement_rank = commGrid->GetComplementRank();
4958 int complement_rowvecs_size = 0;
4960 MPI_Sendrecv(&rowvecs_size, 1, MPI_INT,
4961 complement_rank,
TRX,
4962 &complement_rowvecs_size, 1, MPI_INT,
4963 complement_rank,
TRX,
4964 World, MPI_STATUS_IGNORE);
4966 std::vector<IT> complement_rowvecs(complement_rowvecs_size);
4968 MPI_Sendrecv(rowvecs.data(), rowvecs_size, MPIType<IT>(),
4969 complement_rank,
TRX,
4970 complement_rowvecs.data(), complement_rowvecs_size, MPIType<IT>(),
4971 complement_rank,
TRX,
4972 World, MPI_STATUS_IGNORE);
4974 std::vector<std::vector<std::tuple<IT,IT,NT>>> svec(
nprocs);
4976 std::vector<int> sendcounts(
nprocs, 0);
4977 std::vector<int> recvcounts(
nprocs, 0);
4978 std::vector<int> sdispls(
nprocs, 0);
4979 std::vector<int> rdispls(
nprocs, 0);
4983 IT row_offset, col_offset;
4984 GetPlaceInGlobalGrid(row_offset, col_offset);
4986 for (
auto colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) {
4987 IT destproc = complement_rowvecs[colit.colid()];
4989 for (
auto nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) {
4990 if (destproc == rowvecs[nzit.rowid()]) {
4991 svec[destproc].emplace_back(row_offset + nzit.rowid(), col_offset + colit.colid(), nzit.value());
4992 sendcounts[destproc]++;
4998 MPI_Alltoall(sendcounts.data(), 1, MPI_INT, recvcounts.data(), 1, MPI_INT, World);
5000 size_t rbuflen = std::accumulate(recvcounts.begin(), recvcounts.end(),
static_cast<size_t>(0));
5002 std::partial_sum(sendcounts.begin(), sendcounts.end()-1, sdispls.begin()+1);
5003 std::partial_sum(recvcounts.begin(), recvcounts.end()-1, rdispls.begin()+1);
5005 std::tuple<IT,IT,NT> *sbuf =
new std::tuple<IT,IT,NT>[sbuflen];
5006 std::tuple<IT,IT,NT> *rbuf =
new std::tuple<IT,IT,NT>[rbuflen];
5008 for (
int i = 0; i <
nprocs; ++i)
5009 std::copy(svec[i].begin(), svec[i].end(), sbuf + sdispls[i]);
5012 MPI_Alltoallv(sbuf, sendcounts.data(), sdispls.data(),
MPIType<std::tuple<IT,IT,NT>>(), rbuf, recvcounts.data(), rdispls.data(),
MPIType<std::tuple<IT,IT,NT>>(), World);
5017 LocalIdxs.shrink_to_fit();
5019 std::unordered_map<IT, IT> locmap;
5024 for (
int i = 0; i < rbuflen; ++i) {
5025 global_ids[0] = std::get<0>(rbuf[i]);
5026 global_ids[1] = std::get<1>(rbuf[i]);
5027 for (
int j = 0; j < 2; ++j) {
5028 if (locmap.find(global_ids[j]) == locmap.end()) {
5029 locmap.insert(std::make_pair(global_ids[j], new_id++));
5030 LocalIdxs.push_back(global_ids[j]);
5033 std::get<0>(rbuf[i]) = locmap[global_ids[0]];
5034 std::get<1>(rbuf[i]) = locmap[global_ids[1]];
5037 IT localdim = LocalIdxs.size();
5040 LocalMat.Create(rbuflen, localdim, localdim, rbuf);
5045template <
class IT,
class NT,
class DER>
5048 outfile << (*spSeq) << std::endl;
5052template <
class IU,
class NU,
class UDER>
5055 return s.
put(outfile) ;
5066template <
class IT,
class NT,
class DER>
5067template <
typename LIT>
5070 int procrows = commGrid->GetGridRows();
5071 int proccols = commGrid->GetGridCols();
5072 IT m_perproc = total_m / procrows;
5073 IT n_perproc = total_n / proccols;
5078 own_procrow = std::min(
static_cast<int>(grow / m_perproc), procrows-1);
5082 own_procrow = procrows -1;
5087 own_proccol = std::min(
static_cast<int>(gcol / n_perproc), proccols-1);
5091 own_proccol = proccols-1;
5093 lrow = grow - (own_procrow * m_perproc);
5094 lcol = gcol - (own_proccol * n_perproc);
5095 return commGrid->GetRank(own_procrow, own_proccol);
5102template <
class IT,
class NT,
class DER>
5105 IT total_rows = getnrow();
5106 IT total_cols = getncol();
5108 int procrows = commGrid->GetGridRows();
5109 int proccols = commGrid->GetGridCols();
5110 IT rows_perproc = total_rows / procrows;
5111 IT cols_perproc = total_cols / proccols;
5113 rowOffset = commGrid->GetRankInProcCol()*rows_perproc;
5114 colOffset = commGrid->GetRankInProcRow()*cols_perproc;
void convert(string ifname, string ofname, string sort="revsize")
std::shared_ptr< CommGrid > commGrid
int64_t getGlobalV() const
std::shared_ptr< CommGrid > getcommgrid() const
NT Reduce(_BinaryOperation __binary_op, NT identity) const
void PrintInfo(std::string vectorname) const
void iota(IT globalsize, NT first)
std::shared_ptr< CommGrid > getcommgrid() const
void PrintToFile(std::string prefix)
const NT * GetLocArr() const
void Set(const std::vector< int > &maxsizes, int mA)
static void ProcessLines(std::vector< IT1 > &rows, std::vector< IT1 > &cols, std::vector< NT1 > &vals, std::vector< std::string > &lines, int symmetric, int type, bool onebased=true)
static void ProcessLinesWithStringKeys(std::vector< std::map< std::string, uint64_t > > &allkeys, std::vector< std::string > &lines, int nprocs)
static void deallocate2D(T **array, I m)
static void ProcessStrLinesNPermute(std::vector< IT1 > &rows, std::vector< IT1 > &cols, std::vector< NT1 > &vals, std::vector< std::string > &lines, std::map< std::string, uint64_t > &ultperm)
static const T * p2a(const std::vector< T > &v)
static bool FetchBatch(MPI_File &infile, MPI_Offset &curpos, MPI_Offset end_fpos, bool firstcall, std::vector< std::string > &lines, int myrank)
static void GetSetSizes(const SpMat< IT, NT, DER > &Matrix, IT **&sizes, MPI_Comm &comm1d)
static void BCastMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, const std::vector< IT > &essentials, int root)
static void Print(const std::string &s)
static void ReDistributeToVector(int *&map_scnt, std::vector< std::vector< IT > > &locs_send, std::vector< std::vector< std::string > > &data_send, std::vector< std::array< char, MAXVERTNAME > > &distmapper_array, const MPI_Comm &comm)
void PruneFull(const FullyDistVec< IT, IT > &ri, const FullyDistVec< IT, IT > &ci)
prune all entries whose row indices are in ri OR column indices are in ci
bool Kselect2(FullyDistVec< GIT, VT > &rvec, IT k_limit) const
void EWiseScale(const DenseParMat< IT, NT > &rhs)
FullyDistVec< IT, std::array< char, MAXVERTNAME > > ReadGeneralizedTuples(const std::string &, _BinaryOperation)
void DimApply(Dim dim, const FullyDistVec< IT, NT > &v, _BinaryOperation __binary_op)
FullyDistVec< IT, NT > Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
SpParMat< IT, NT, DER > SubsRefCol(const std::vector< IT > &ci) const
Column indexing with special parallel semantics.
bool operator==(const SpParMat< IT, NT, DER > &rhs) const
std::vector< std::vector< SpParMat< IT, NT, DER > > > BlockSplit(int br, int bc)
void ReadDistribute(const std::string &filename, int master, bool nonum, HANDLER handler, bool transpose=false, bool pario=false)
bool Kselect(FullyDistVec< GIT, VT > &rvec, IT k_limit, int kselectVersion) const
void MaskedReduce(FullyDistVec< GIT, VT > &rvec, FullyDistSpVec< GIT, VT > &mask, Dim dim, _BinaryOperation __binary_op, VT id, bool exclude=false) const
void EWiseMult(const SpParMat< IT, NT, DER > &rhs, bool exclude)
SpParMat< IT, NT, DER > Prune(_UnaryOperation __unary_op, bool inPlace=true)
void ActivateThreading(int numsplits)
void Find(FullyDistVec< IT, IT > &, FullyDistVec< IT, IT > &, FullyDistVec< IT, NT > &) const
float LoadImbalance() const
void SetDifference(const SpParMat< IT, NT, DER > &rhs)
SpParMat< IT, NT, DER > SubsRef_SR(const FullyDistVec< IT, IT > &ri, const FullyDistVec< IT, IT > &ci, bool inplace=false)
General indexing with serial semantics.
void SpAsgn(const FullyDistVec< IT, IT > &ri, const FullyDistVec< IT, IT > &ci, SpParMat< IT, NT, DER > &B)
SpParMat< IT, NT, DER > & operator+=(const SpParMat< IT, NT, DER > &rhs)
void Dump(std::string filename) const
void UpdateDense(DenseParMat< IT, NT > &rhs, _BinaryOperation __binary_op) const
void AddLoops(NT loopval, bool replaceExisting=false)
void SaveGathered(std::string filename, HANDLER handler, bool transpose=false) const
SpParMat< IT, NT, DER > & operator=(const SpParMat< IT, NT, DER > &rhs)
void ParallelReadMM(const std::string &filename, bool onebased, _BinaryOperation BinOp)
void ParallelBinaryWrite(std::string filename) const
bool Kselect1(FullyDistVec< GIT, VT > &rvec, IT k_limit, _UnaryOperation __unary_op) const
void OptimizeForGraph500(OptBuf< LIT, OT > &optbuf)
std::ofstream & put(std::ofstream &outfile) const
void ParallelWriteMM(const std::string &filename, bool onebased, HANDLER handler)
SpParMat()
Deprecated. Don't call the default constructor.
DER InducedSubgraphs2Procs(const FullyDistVec< IT, IT > &Assignments, std::vector< IT > &LocalIdxs) const
int Owner(IT total_m, IT total_n, IT grow, IT gcol, LIT &lrow, LIT &lcol) const
void PruneColumnByIndex(const FullyDistSpVec< IT, IRRELEVANT_NT > &ci)
SpParMat< IT, NT, DER > PruneColumn(const FullyDistVec< IT, NT > &pvals, _BinaryOperation __binary_op, bool inPlace=true)
Prune every column of a sparse matrix based on pvals.
void SparseCommon(std::vector< std::vector< std::tuple< LIT, LIT, NT > > > &data, LIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
IT AddLoops(NT loopval, bool replaceExisting=false)
#define MEM_EFFICIENT_STAGES
#define mm_is_complex(typecode)
#define mm_is_hermitian(typecode)
char * mm_typecode_to_str(MM_typecode matcode)
#define mm_is_real(typecode)
#define mm_is_pattern(typecode)
#define mm_is_integer(typecode)
int mm_read_mtx_crd_size(FILE *f, int64_t *M, int64_t *N, int64_t *nz, int64_t *numlinesread)
int mm_read_banner(FILE *f, MM_typecode *matcode)
#define mm_is_symmetric(typecode)
HeaderInfo ParseHeader(const std::string &inputname, FILE *&f, int &seeklength)
MPI_Datatype MPIType< double >(void)
std::ofstream & operator<<(std::ofstream &outfile, const SpMat< UIT, UNT, UDER > &s)
Dcsc< IU, typename promote_trait< NU1, NU2 >::T_promote > SetDifference(const Dcsc< IU, NU1 > &A, const Dcsc< IU, NU2 > *B)
MPI_Datatype MPIType(void)
MPI_Datatype MPIType< int64_t >(void)
void AllGatherVector(MPI_Comm &ColWorld, int trxlocnz, IU lenuntil, int32_t *&trxinds, NV *&trxnums, int32_t *&indacc, NV *&numacc, int &accnz, bool indexisvalue)
void TransposeVector(MPI_Comm &World, const FullyDistSpVec< IU, NV > &x, int32_t &trxlocnz, IU &lenuntil, int32_t *&trxinds, NV *&trxnums, bool indexisvalue)
MPI_Datatype MPIType< uint64_t >(void)
shared_ptr< CommGrid > ProductGrid(CommGrid *gridA, CommGrid *gridB, int &innerdim, int &Aoffset, int &Boffset)
Collection of Generic Sequential Functions.
unsigned __int64 uint64_t
Compute the maximum of two values.
Compute the minimum of two values.