00001
00002 #include "dataservermpi.h"
00003
00004 #ifdef MPI
00005 #include "mpi.h"
00006 #endif
00007
00008 DataServerMPI::DataServerMPI() : DataServer()
00009 {
00010 }
00011
00012 DataServerMPI::DataServerMPI(int port) : DataServer()
00013 {
00014 Start(port);
00015 }
00016
00017 DataServerMPI::~DataServerMPI()
00018 {
00019 #ifdef MPI
00020 MPI_Comm_free(&comm_dataserver);
00021 #endif
00022 }
00023
00024 int DataServerMPI::Start(int port)
00025 {
00026 #ifdef MPI
00027 MPI_Comm_dup(MPI_COMM_WORLD, &comm_dataserver);
00028 MPI_Comm_rank(comm_dataserver, &procID);
00029 MPI_Comm_size(comm_dataserver, &numCPUs);
00030 #else
00031 procID = 0;
00032 numCPUs = 1;
00033 #endif
00034 portNum = port+procID;
00035 char hostname[100];
00036 int hostnamelength;
00037 char (*rbuf)[100] = NULL;
00038 #ifdef MPI
00039 if (isMaster())
00040 ALLOC1D(&rbuf, numCPUs);
00041 MPI_Get_processor_name(hostname,&hostnamelength);
00042 MPI_Gather(hostname, 100, MPI_BYTE, rbuf, 100, MPI_BYTE, 0, comm_dataserver);
00043 if (isMaster())
00044 {
00045 ofstream file("DataServer.out");
00046 for (int i = 0; i < numCPUs; i++)
00047 file << "Proc " << i << ": " << rbuf[i] << " => " << port+i << endl << flush;
00048 file.close();
00049 FREE1D(&rbuf, numCPUs);
00050 }
00051 #endif
00052 return DataServer::Start(portNum);
00053 }
00054
00055 int DataServerMPI::Wait(char *key)
00056 {
00057 if (isMaster())
00058 DataServer::Wait(key);
00059 Barrier();
00060 return 0;
00061 }
00062
00063 int DataServerMPI::Post(char *key)
00064 {
00065 if (isMaster())
00066 DataServer::Post(key);
00067 return 0;
00068 }
00069
00070 int DataServerMPI::Synchronize(char *filename)
00071 {
00072 int update_flag;
00073 if (isMaster())
00074 update_flag = IsDirty(0);
00075 BroadcastVariable(&update_flag);
00076
00077 if (!update_flag)
00078 return POSSE_SUCCESS;
00079
00080 for (int i = 0; i < NumKeys(); i++)
00081 {
00082 if (keytable->isViewableVar(i) && keytable->isWritable(i))
00083 BroadcastData(keytable->key[i].ptr, keytable->key[i].elem_size);
00084 if (keytable->isWritable(i) && (callback || keytable->hasCallback(i)))
00085 {
00086 update_flag = IsDirtyID(i, 0);
00087 BroadcastVariable(&update_flag);
00088 if (update_flag)
00089 {
00090 if (callback)
00091 callback(Key(i));
00092 if (keytable->hasCallback(i))
00093 keytable->Callback(i);
00094 }
00095 }
00096 }
00097
00098 if (isMaster())
00099 DataServer::Synchronize(filename);
00100
00101 Barrier();
00102
00103 return POSSE_SUCCESS;
00104 }
00105
00106 void DataServerMPI::BroadcastData(void *var, int size)
00107 {
00108 #ifdef MPI
00109 MPI_Bcast(var, size, MPI_BYTE, 0, comm_dataserver);
00110 #endif
00111 }
00112
00113 void DataServerMPI::Barrier()
00114 {
00115 #ifdef MPI
00116 MPI_Barrier(comm_dataserver);
00117 #endif
00118 }
00119
00120 int DataServerMPI::RegisterInternal(char *keyword, RemoteSocket *C, int &error_flag, bool reg_flag)
00121 {
00122
00123
00124 REGISTER_VARIABLE("MPI_nprocs", "ro", numCPUs);
00125 REGISTER_VARIABLE("MPI_procID", "ro", procID);
00126
00127 return 0;
00128 }
00129