4#ifndef __STDC_CONSTANT_MACROS
5#define __STDC_CONSTANT_MACROS
7#ifndef __STDC_LIMIT_MACROS
8#define __STDC_LIMIT_MACROS
16#include "CombBLAS/CombBLAS.h"
17#include "CombBLAS/SpHelper.h"
25template <
typename T1,
typename T2>
29 static T_promote id(){
return std::numeric_limits<T_promote>::max(); };
31 static MPI_Op
mpi_op() {
return MPI_MIN; };
34 return std::min(arg1, arg2);
51 return std::min(a, b);
56IT LabelCC(FullyDistVec<IT, IT> & father, FullyDistVec<IT, IT> & cclabel)
59 cclabel.ApplyInd([](
IT val,
IT ind){
return val==ind ? -1 : val;});
60 FullyDistSpVec<IT, IT> roots (cclabel, bind2nd(std::equal_to<IT>(), -1));
63 cclabel = cclabel(father);
64 return roots.getnnz();
67template <
class IT,
class NT>
69 std::vector<std::vector<NT>> &reduceBuffer,
NT MAX_FOR_REDUCE)
72 MPI_Comm World = commGrid->GetWorld();
73 int nprocs = commGrid->GetSize();
75 MPI_Comm_rank(World,&myrank);
77 std::vector<int> sendcnt (
nprocs,0);
78 std::vector<int> recvcnt (
nprocs);
79 std::vector<std::vector<IT>> indBuf(
nprocs);
80 std::vector<std::vector<NT>> valBuf(
nprocs);
85 for(
IT i = 0; i < loclen; ++i) {
87 int owner = ind.Owner(indices[i], locind);
88 if(reduceBuffer[owner].
size() == 0) {
89 indBuf[owner].push_back(locind);
90 valBuf[owner].push_back(values[i]);
95 MPI_Alltoall(sendcnt.data(), 1, MPI_INT, recvcnt.data(), 1, MPI_INT, World);
96 IT totrecv = std::accumulate(recvcnt.begin(),recvcnt.end(),
static_cast<IT>(0));
97 double reduceCost = ind.MyLocLength() * log2(
nprocs);
99 std::vector<IT> reducecnt(
nprocs,0);
102 if(reduceCost < totrecv)
103 reducesize = ind.MyLocLength();
104 MPI_Allgather(&reducesize, 1, MPIType<IT>(), reducecnt.data(), 1, MPIType<IT>(), World);
106 for(
int i = 0; i <
nprocs; ++i)
107 if (reducecnt[i] > 0) nreduce++;
110 MPI_Request* requests =
new MPI_Request[nreduce];
111 MPI_Status* statuses =
new MPI_Status[nreduce];
114 for (
int i = 0; i <
nprocs; ++i) {
115 if(reducecnt[i] > 0) {
116 reduceBuffer[i].resize(reducecnt[i], MAX_FOR_REDUCE);
117 for (
int j = 0; j < sendcnt[i]; j++)
118 reduceBuffer[i][indBuf[i][j]] = std::min(reduceBuffer[i][indBuf[i][j]], valBuf[i][j]);
120 MPI_Ireduce(MPI_IN_PLACE, reduceBuffer[i].data(), reducecnt[i], MPIType<NT>(), MPI_MIN, i, World, &requests[ireduce++]);
122 MPI_Ireduce(reduceBuffer[i].data(), NULL, reducecnt[i], MPIType<NT>(), MPI_MIN, i, World, &requests[ireduce++]);
125 MPI_Waitall(nreduce, requests, statuses);
132template <
class IT,
class NT>
135 IT globallen = ind.TotalLength();
137 MPI_Comm World = commGrid->GetWorld();
138 int nprocs = commGrid->GetSize();
139 int * rdispls =
new int[
nprocs+1];
140 int * recvcnt =
new int[
nprocs];
141 int * sendcnt =
new int[
nprocs]();
142 int * sdispls =
new int[
nprocs+1];
144 std::vector<std::vector<NT> > reduceBuffer(
nprocs);
145 NT MAX_FOR_REDUCE =
static_cast<NT>(globallen);
146 int nreduce =
ReduceAssign(ind, val, reduceBuffer, MAX_FOR_REDUCE);
148 std::vector<std::vector<IT> > indBuf(
nprocs);
149 std::vector<std::vector<NT> > valBuf(
nprocs);
154 for(
IT i = 0; i < loclen; ++i) {
156 int owner = ind.Owner(indices[i], locind);
157 if(reduceBuffer[owner].
size() == 0) {
158 indBuf[owner].push_back(locind);
159 valBuf[owner].push_back(values[i]);
164 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
167 for(
int i = 0; i <
nprocs; ++i) {
168 sdispls[i + 1] = sdispls[i] + sendcnt[i];
169 rdispls[i + 1] = rdispls[i] + recvcnt[i];
174 std::vector<IT> sendInd(totsend);
175 std::vector<NT> sendVal(totsend);
176 for(
int i=0; i <
nprocs; ++i) {
177 std::copy(indBuf[i].begin(), indBuf[i].end(), sendInd.begin()+sdispls[i]);
178 std::vector<IT>().swap(indBuf[i]);
179 std::copy(valBuf[i].begin(), valBuf[i].end(), sendVal.begin()+sdispls[i]);
180 std::vector<NT>().swap(valBuf[i]);
182 std::vector<IT> recvInd(totrecv);
183 std::vector<NT> recvVal(totrecv);
185 MPI_Alltoallv(sendInd.data(), sendcnt, sdispls, MPIType<IT>(), recvInd.data(), recvcnt, rdispls, MPIType<IT>(), World);
186 MPI_Alltoallv(sendVal.data(), sendcnt, sdispls, MPIType<IT>(), recvVal.data(), recvcnt, rdispls, MPIType<IT>(), World);
187 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
190 MPI_Comm_rank(World, &myrank);
191 if(reduceBuffer[myrank].
size() > 0)
192 for(
int i = 0; i<reduceBuffer[myrank].size(); i++)
193 if(reduceBuffer[myrank][i] < MAX_FOR_REDUCE) {
194 recvInd.push_back(i);
195 recvVal.push_back(reduceBuffer[myrank][i]);
202template <
class IT,
class NT>
206 MPI_Comm World = commGrid->GetWorld();
207 int nprocs = commGrid->GetSize();
209 std::vector<int> sendcnt (
nprocs, 0);
210 std::vector<int> recvcnt (
nprocs, 0);
213 for(
IT i = 0; i < length; ++i) {
215 int owner = dense.Owner(p[i], locind);
218 MPI_Alltoall(sendcnt.data(), 1, MPI_INT, recvcnt.data(), 1, MPI_INT, World);
219 IT totrecv = std::accumulate(recvcnt.begin(), recvcnt.end(),
static_cast<IT>(0));
223 std::vector<IT> bcastcnt(
nprocs, 0);
226 if (broadcast_cost < totrecv)
228 MPI_Allgather(&bcastsize, 1, MPIType<IT>(), bcastcnt.data(), 1, MPIType<IT>(), World);
230 for (
int i = 0; i <
nprocs; i++)
231 if (bcastcnt[i] > 0) nbcast++;
234 MPI_Request* requests =
new MPI_Request[nbcast];
235 MPI_Status* statuses =
new MPI_Status[nbcast];
238 for(
int i = 0; i <
nprocs; i++) {
239 if (bcastcnt[i] > 0) {
240 bcastBuffer[i].resize(bcastcnt[i]);
241 std::copy(arr, arr + bcastcnt[i], bcastBuffer[i].begin());
242 MPI_Ibcast(bcastBuffer[i].data(), bcastcnt[i], MPIType<NT>(), i, World, &requests[ibcast++]);
245 MPI_Waitall(nbcast, requests, statuses);
252template <
class IT,
class NT>
256 MPI_Comm World = commGrid->GetWorld();
257 int nprocs = commGrid->GetSize();
259 std::vector<std::vector<NT> > bcastBuffer(
nprocs);
260 int nbcast =
replicate(dense, ri, bcastBuffer);
262 std::vector<std::vector<IT> > data_req(
nprocs);
263 std::vector<std::vector<IT> > revr_map(
nprocs);
269 std::vector<IT>
q(length);
270 for(
IT i = 0; i < length; ++i) {
272 int owner = dense.Owner(p[i], locind);
273 if(bcastBuffer[owner].
size() == 0) {
274 data_req[owner].push_back(locind);
275 revr_map[owner].push_back(i);
277 q[i] = bcastBuffer[owner][locind];
280 int *sendcnt =
new int[
nprocs];
281 int *sdispls =
new int[
nprocs];
282 for(
int i = 0; i <
nprocs; ++i)
283 sendcnt[i] = (
int) data_req[i].size();
285 int *rdispls =
new int[
nprocs];
286 int *recvcnt =
new int[
nprocs];
288 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
292 for(
int i = 0; i <
nprocs - 1; ++i) {
293 sdispls[i + 1] = sdispls[i] + sendcnt[i];
294 rdispls[i + 1] = rdispls[i] + recvcnt[i];
296 IT totsend = std::accumulate(sendcnt, sendcnt +
nprocs,
static_cast<IT>(0));
297 IT totrecv = std::accumulate(recvcnt, recvcnt +
nprocs,
static_cast<IT>(0));
299 IT *sendbuf =
new IT[totsend];
300 for(
int i = 0; i <
nprocs; ++i) {
301 std::copy(data_req[i].begin(), data_req[i].end(), sendbuf + sdispls[i]);
302 std::vector<IT>().swap(data_req[i]);
304 IT *reversemap =
new IT[totsend];
305 for(
int i = 0; i <
nprocs; ++i) {
306 std::copy(revr_map[i].begin(), revr_map[i].end(), reversemap + sdispls[i]);
307 std::vector<IT>().swap(revr_map[i]);
309 IT *recvbuf =
new IT[totrecv];
310 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<IT>(), recvbuf, recvcnt, rdispls, MPIType<IT>(), World);
313 NT *databack =
new NT[totrecv];
316#pragma omp parallel for
318 for(
int i = 0; i < totrecv; ++i)
319 databack[i] = arr[recvbuf[i]];
323 NT *databuf =
new NT[totsend];
325 MPI_Alltoallv(databack, recvcnt, rdispls, MPIType<IT>(), databuf, sendcnt, sdispls, MPIType<IT>(), World);
327 for(
int i = 0; i < totsend; ++i)
328 q[reversemap[i]] = databuf[i];
331 DeleteAll(sdispls, sendcnt, databuf, reversemap);
335template<
typename IT,
typename NT,
typename DER>
339 D.iota(
A.getnrow(), 0);
344 IT diff =
D.TotalLength();
345 for (
int iter = 1; diff != 0; iter++) {
346 if (diff * 50 >
A.getnrow()) {
347 mngp = SpMV<Select2ndMinSR<NT, IT> >(
A, gp);
351 [](
IT m,
IT p) {
return p; },
352 [](
IT m,
IT p) {
return true; },
353 false,
static_cast<IT>(0));
355 SpMV<Select2ndMinSR<IT, IT> >(
A, SpG, hooks,
false);
357 [](
IT a,
IT b){
return true; },
false,
A.getnrow());
364 dup.
EWiseOut(gp, [](
IT a,
IT b) {
return static_cast<IT>(a != b); }, mod);
365 diff =
static_cast<IT>(mod.
Reduce(std::plus<IT>(),
static_cast<IT>(0)));
368 sprintf(out,
"Iteration %d: diff %ld\n", iter, diff);
Mac OS X ATTR com apple quarantine q
T operator()(const T &a, const T &b)
void EWiseApply(const FullyDistVec< IT, NT2 > &other, _BinaryOperation __binary_op, _BinaryPredicate _do_op, const bool useExtendedBinOp)
NT Reduce(_BinaryOperation __binary_op, NT identity) const
std::shared_ptr< CommGrid > getcommgrid() const
void EWiseOut(const FullyDistVec< IT, NT > &rhs, _BinaryOperation __binary_op, FullyDistVec< IT, OUT > &result)
const NT * GetLocArr() const
static void Print(const std::string &s)
int replicate(const FullyDistVec< IT, NT > dense, FullyDistSpVec< IT, IT > ri, vector< vector< NT > > &bcastBuffer)
IT LabelCC(FullyDistVec< IT, IT > &parent, FullyDistVec< IT, IT > &cclabel)
FullyDistSpVec< IT, NT > Assign(FullyDistSpVec< IT, IT > &ind, FullyDistSpVec< IT, NT > &val)
int ReduceAssign(FullyDistSpVec< IT, IT > &ind, FullyDistSpVec< IT, NT > &val, vector< vector< NT > > &reduceBuffer, NT MAX_FOR_REDUCE)
FullyDistSpVec< IT, NT > Extract(const FullyDistVec< IT, NT > dense, FullyDistSpVec< IT, IT > ri)
FullyDistVec< IT, IT > SV(SpParMat< IT, NT, DER > &A, IT &nCC)
static bool returnedSAID()
promote_trait< T1, T2 >::T_promote T_promote
static T_promote add(const T_promote &arg1, const T_promote &arg2)
static T_promote multiply(const T1 &arg1, const T2 &arg2)
static T2 add(const T2 &arg1, const T2 &arg2)
static T2 multiply(const T1 &arg1, const T2 &arg2)
static void axpy(const T1 a, const T2 &x, T_promote &y)