00001
00002 #include <ctype.h>
00003 #include <iomanip.h>
00004 #include "errcode.h"
00005 #include "misc.h"
00006 #include "myendian.h"
00007 #include "dataserver.h"
00008 #include "mythread.h"
00009
00010 DEFINE_TEMPLATE_THREAD_CLASS(DataServerMainThread, DataServer);
00011 DEFINE_TEMPLATE_THREAD_CLASS(DataServerChildThread, ChildThreadStruct);
00012
00013 DataServer::DataServer()
00014 {
00015 Parent = new DataServerMainThread;
00016 keytable = new KeyTable;
00017 callback = NULL;
00018 p_run_flag = 0;
00019 }
00020
00021 DataServer::DataServer(int port)
00022 {
00023 Parent = new DataServerMainThread;
00024 keytable = new KeyTable;
00025 callback = NULL;
00026 p_run_flag = 0;
00027 Start(port);
00028 }
00029
00030 DataServer::~DataServer()
00031 {
00032 if (p_run_flag)
00033 Terminate();
00034 delete Parent;
00035 delete keytable;
00036 }
00037
00038 void DataServer::RegisterCallback(void (*cb)(char*))
00039 {
00040 callback = cb;
00041 }
00042
00043 int DataServer::NumPendingThreads()
00044 {
00045 int num_pendingthreads = 0;
00046 int size = child.size();
00047
00048 for (int i = 0; i < size; i++)
00049 {
00050 if (child[i]->isRunning)
00051 ++num_pendingthreads;
00052 }
00053
00054 return num_pendingthreads;
00055 }
00056
00057 int DataServer::Terminate()
00058 {
00059 Stop();
00060 Finalize();
00061 p_run_flag = 0;
00062 cout << "----------------------------------------------------------------------" << endl;
00063 cout << "Stopping DataServer (ID = " << p_serverID << ")." << endl;
00064 cout << "----------------------------------------------------------------------" << endl;
00065 cout << "Total Bytes sent = " << setprecision(5) << BytesSent() << " at rate of " << setprecision(4) << SendRate() << " Mbps." << endl;
00066 cout << "Total Bytes recd = " << setprecision(5) << BytesRecd() << " at rate of " << setprecision(4) << RecvRate() << " Mbps." << endl;
00067 cout << "Total Uptime = " << myTimer()-start_time << " ms." << endl;
00068 cout << "----------------------------------------------------------------------" << endl;
00069 cout.flush();
00070
00071 return 0;
00072 }
00073
00074 int DataServer::Stop()
00075 {
00076 int n = NumPendingThreads();
00077
00078 if (n)
00079 {
00080 cout << "DataServer: Waiting for " << n << " client(s) to exit...." << endl << flush;
00081 }
00082
00083 while (NumPendingThreads())
00084 {
00085 mySleep(1000);
00086 }
00087
00088 return 0;
00089 }
00090
00091 int DataServer::Start(int port)
00092 {
00093 Register();
00094 static int count = 0;
00095 p_serverID = ++count;
00096 p_run_flag = 1;
00097
00098 int numtries = 1;
00099 while (!TCPSocket::Bind(port))
00100 {
00101 cerr << "Retrying after 5 seconds." << endl << flush;
00102 mySleep(5*1000000);
00103 if (numtries++ == 20)
00104 return POSSE_FAILURE;
00105 }
00106
00107 char hostname[200];
00108 gethostname(hostname,200);
00109 cout << "DataServer: Binding for " << hostname << ":" << port << endl << flush;
00110
00111 TCPSocket::Listen();
00112
00113 cout << "----------------------------------------------------------------------" << endl;
00114 cout << "Starting DataServer (ID = " << p_serverID << ")" << endl;
00115 cout << "----------------------------------------------------------------------" << endl << flush;
00116 Parent->Start(this);
00117
00118 start_time = myTimer();
00119
00120 return POSSE_SUCCESS;
00121 }
00122
00123 int DataServer::Finalize()
00124 {
00125 TCPSocket::InboundClose();
00126 Parent->Kill();
00127
00128 int size = child.size();
00129 for (int i = 0; i < size; i++)
00130 {
00131 delete child[i]->client;
00132 delete child[i]->th;
00133 delete child[i];
00134 }
00135 child.clear();
00136 p_hash_table.clear();
00137
00138 return 0;
00139 }
00140
00141 int DataServer::AcceptClient()
00142 {
00143 RemoteSocket *Client = new RemoteSocket;
00144 *((TCPSocket*) Client) = *((TCPSocket*) this);
00145
00146 ChildThreadStruct *T = new ChildThreadStruct;
00147 T->isRunning = 0;
00148 T->server = this;
00149 T->ID = child.size();
00150 T->th = NULL;
00151 T->client = Client;
00152 child.push_back(T);
00153
00154 int fail = 0;
00155 while (!Client->Accept())
00156 {
00157 fail = 1;
00158 cerr << "Retrying connection after 2 seconds." << endl << flush;
00159 mySleep(2*1000000);
00160 }
00161
00162 if (fail)
00163 {
00164 cerr << "Connected." << endl << flush;
00165 }
00166
00167 return T->ID;
00168 }
00169
00170 void DataServer::RunThread(int ID)
00171 {
00172 int count = 0;
00173 int verification_number = VERIFICATION_NUMBER;
00174 int endiantype = isBigEndian();
00175 RemoteSocket *C = child[ID]->client;
00176 char clientname[200];
00177
00178 cout << "======================================================================" << endl;
00179 cout << "Accepting connection from client (ID = " << ID+1 << ") : " << flush;
00180
00181 GetKeyword(C, clientname);
00182 cout << clientname << ":" << C->PeerPort() << endl;
00183 if (strcmp(C->PeerName(), clientname) != 0)
00184 {
00185 cout << "\t[Tunneling via " << C->PeerName() << "]" << endl << flush;
00186 }
00187 cout << "======================================================================" << endl;
00188 cout.flush();
00189 char keyword[100];
00190
00191 strcpy(keyword, "");
00192 int error_flag = 0;
00193
00194 while (strcmp(keyword, "_q_private") != 0)
00195 {
00196 GetKeyword(C, keyword);
00197 error_flag = 1;
00198 #ifdef ANIDEBUG
00199 cout << "Got keyword (from client " << ID << "): " << keyword << endl << flush;
00200 #endif
00201
00202 if (strcmp(keyword, "_q_private") == 0)
00203 error_flag = 0;
00204
00205 Register_Variable_Internal(C, error_flag, keyword, "_start_private", "ro", verification_number);
00206 Register_Variable_Internal(C, error_flag, keyword, "_isBigEndian", "ro", endiantype);
00207
00208 if (!strcmp(keyword, "Rinternal_ftp"))
00209 {
00210 char filename[200];
00211 error_flag = 0;
00212 C->RecvLine(filename);
00213 C->RecvFile(filename);
00214 }
00215 if (!strcmp(keyword, "internal_ftp"))
00216 {
00217 char filename[200];
00218 error_flag = 0;
00219 C->RecvLine(filename);
00220 C->SendFile(filename);
00221 }
00222
00223 Register(keyword, C, error_flag);
00224
00225 if (!strcmp(keyword, "keytable"))
00226 {
00227 Wait();
00228 UpdateKeyTable();
00229 Post();
00230 }
00231
00232 Register_Structure_Internal(C, error_flag, keyword, "keytable", "ro", *keytable);
00233
00234 if (error_flag == 1)
00235 {
00236 cout << "Unidentified keyword \"" << keyword << "\"!!" << endl << flush;
00237 SendVar<int>(0);
00238 }
00239
00240 if (error_flag == 2 && !isRunningMPI())
00241 {
00242 char newkeyword[100];
00243 strcpy(newkeyword, &keyword[1]);
00244 if (callback)
00245 callback(newkeyword);
00246 if (keytable->hasCallback(newkeyword))
00247 keytable->Callback(newkeyword);
00248 }
00249
00250 ++count;
00251 }
00252 cout << "======================================================================" << endl;
00253 cout << "Releasing client (ID = " << ID+1 << "): " << clientname << ":" << C->PeerPort() << endl;
00254 cout << "======================================================================" << endl;
00255 cout.flush();
00256
00257 return;
00258 }
00259
00260 int DataServer::RegisterKey(char *key, DataType desc, char *vartype, int elem_size, int num_elems, char *perm, void *ptr)
00261 {
00262 Hash_Table::const_iterator iter = p_hash_table.find(key);
00263 if (iter != p_hash_table.end())
00264 {
00265 cout << "DataServer::RegisterKey: Key \"" << key << "\" already registered!!" << endl << flush;
00266 return iter->second;
00267 }
00268
00269 char type[80];
00270 int n = 0;
00271
00272 for (int i = 0; i < strlen(vartype); i++)
00273 {
00274 if (!isspace(vartype[i]))
00275 type[n++] = vartype[i];
00276 }
00277 type[n] = '\0';
00278
00279 int id = keytable->n;
00280 ++keytable->n;
00281 p_hash_table[key] = id;
00282
00283 #ifdef ANIDEBUG
00284 cout << "Registering key[" << id+1 << "] = \"" << key << "\"" << endl << flush;
00285 #endif
00286
00287 keytable->Add();
00288 keytable->key[id].sem = new Semaphore;
00289 keytable->key[id].id = id;
00290 keytable->key[id].is_locked = 0;
00291 keytable->key[id].is_dirty = 0;
00292 keytable->key[id].elem_size = elem_size;
00293 keytable->key[id].num_elems = num_elems;
00294 keytable->key[id].ptr = ptr;
00295 if (strcmp(perm, "rw") == 0)
00296 keytable->key[id].perm = KEY_READ_WRITE;
00297 else if (strcmp(perm, "ro") == 0)
00298 keytable->key[id].perm = KEY_READ_ONLY;
00299 else
00300 {
00301 cout << "Invalid permission \"" << perm << "\" for keyword \"" << key << "\"!!" << endl << flush;
00302 exit(-1);
00303 }
00304 strcpy(keytable->key[id].key, key);
00305 keytable->key[id].description = desc;
00306 strcpy(keytable->key[id].type, type);
00307
00308 keytable->key[id].vartype = UNKNOWN;
00309
00310 static char typedesc[][20] = {"char", "short", "int", "long", "float", "double"};
00311 static VariableType vtype[] = {CHAR, SHORT, INT, LONG, FLOAT, DOUBLE};
00312 static int ntype = sizeof(typedesc)/sizeof(char[20]);
00313
00314 for (int j = 0; j < ntype; j++)
00315 {
00316 if (strcmp(type, typedesc[j]) == 0)
00317 {
00318 keytable->key[id].vartype = vtype[j];
00319 break;
00320 }
00321 }
00322
00323 return id;
00324 }
00325
00326 int DataServer::UpdateKeyTable()
00327 {
00328 KeyTable *K = keytable;
00329 for (int i = 0; i < K->n; i++)
00330 {
00331 K->key[i].is_locked = K->key[i].sem->WaitCount() - K->key[i].sem->PostCount();
00332 int maxlen = sizeof(K->key[i].value);
00333 memcpy(K->key[i].value, GetValue(K->key[i].key), maxlen-1);
00334 K->key[i].value[maxlen-1] = '\0';
00335 if (K->key[i].is_locked < 0) K->key[i].is_locked = 0;
00336 }
00337 return 0;
00338 }
00339
00340 int DataServer::ClearLocks()
00341 {
00342 int n = 0;
00343 for (int i = 0; i < keytable->n; i++)
00344 {
00345 keytable->key[i].is_locked = keytable->key[i].sem->WaitCount() - keytable->key[i].sem->PostCount();
00346 if (keytable->key[i].is_locked > 0)
00347 {
00348 ++n;
00349 cout << "\tRemoving lock on key \"" << keytable->key[i].key << "\"." << endl << flush;
00350 keytable->key[i].sem->Post();
00351
00352 }
00353 }
00354 return n;
00355 }
00356
00357 char *DataServer::GetValue(int i)
00358 {
00359 return GetValue(Key(i));
00360 }
00361
00362 char *DataServer::GetValue(char *keyword)
00363 {
00364 char *val = new char[100];
00365 VariableType vtype = UNKNOWN;
00366 KeyTable *K = keytable;
00367
00368 int handle = K->getKeyHandle(keyword);
00369 if (handle < K->n && K->isViewableVar(handle))
00370 vtype = K->key[handle].vartype;
00371
00372
00373 if (vtype != UNKNOWN && (int) K->key[handle].ptr < 1024*1024)
00374 vtype = UNKNOWN;
00375
00376 switch (vtype)
00377 {
00378 case CHAR:
00379 if (K->isCharArray(handle))
00380 {
00381 memcpy(val, K->key[handle].ptr, 99);
00382 val[99] = '\0';
00383 }
00384 else
00385 {
00386 char x_char;
00387 memcpy(&x_char, K->key[handle].ptr, sizeof(char));
00388 sprintf(val, "%c\0", x_char);
00389 }
00390 break;
00391
00392 case SHORT:
00393 short x_short;
00394 memcpy(&x_short, K->key[handle].ptr, sizeof(short));
00395 sprintf(val, "%d\0", x_short);
00396 break;
00397
00398 case INT:
00399 int x_int;
00400 memcpy(&x_int, K->key[handle].ptr, sizeof(int));
00401 sprintf(val, "%d\0", x_int);
00402 break;
00403
00404 case LONG:
00405 long x_long;
00406 memcpy(&x_long, K->key[handle].ptr, sizeof(long));
00407 sprintf(val, "%ld\0", x_long);
00408 break;
00409
00410 case FLOAT:
00411 float x_float;
00412 memcpy(&x_float, K->key[handle].ptr, sizeof(float));
00413 sprintf(val, "%g\0", x_float);
00414 break;
00415
00416 case DOUBLE:
00417 double x_double;
00418 memcpy(&x_double, K->key[handle].ptr, sizeof(double));
00419 sprintf(val, "%g\0", x_double);
00420 break;
00421
00422 default:
00423 strcpy(val, "error");
00424 break;
00425 }
00426
00427 return val;
00428 }
00429
00430 void DataServer::PrintDirty(ostream &out, int change)
00431 {
00432 for (int i = 0; i < NumKeys(); i++)
00433 if (IsDirtyID(i, change))
00434 out << "\tDataServer: New value for \"" << Key(i) << "\" = " << GetValue(i) << endl << flush;
00435 }
00436
00438
00439 {
00440 signal(SIGABRT, SIG_IGN);
00441 while (!isExit())
00442 {
00443 int id = S->AcceptClient();
00444
00445 S->child[id]->th = new DataServerChildThread;
00446 S->child[id]->th->Start(S->child[id]);
00447 }
00448 }
00449
00450 void DataServerChildThread::Run(ChildThreadStruct *T)
00451 {
00452 trap_sigpipe();
00453
00454 try
00455 {
00456 T->isRunning = 1;
00457 T->server->RunThread(T->ID);
00458 T->isRunning = 0;
00459 }
00460 catch (int &e)
00461 {
00462 #ifndef __CYGWIN32__
00463 if (e == ERR_BAD_SOCKET || e == ERR_RECV_MESG || e == ERR_SEND_MESG)
00464 {
00465 cout << "DataServer: Killing child thread " << T->th->ID() << "." << endl;
00466 }
00467 if (e == ERR_SIGPIPE)
00468 {
00469 cout << "DataServer:SIGPIPE: Killing child thread " << T->th->ID() << "." << endl;
00470 }
00471 #endif
00472 if (e == ERR_BAD_DATA)
00473 {
00474 return;
00475 }
00476 T->server->ClearLocks();
00477 T->isRunning = 0;
00478 T->th->Kill();
00479 }
00480 catch (...)
00481 {
00482 cout << "Another exception!!" << endl;
00483 }
00484 }
00485
00486 int DataServer::Synchronize(char *filename)
00487 {
00488 if (filename)
00489 {
00490 ofstream file(filename, ofstream::app);
00491 PrintDirty(file, 1);
00492 file.close();
00493 }
00494 else
00495 PrintDirty(cout, 1);
00496
00497 return POSSE_SUCCESS;
00498 }
00499
00500 int DataServer::Register(char *keyword, RemoteSocket *client, int &error_flag, bool register_flag)
00501 {
00502 if (RegisterInternal(keyword, client, error_flag, register_flag) == 1)
00503 return 1;
00504 return RegisterExternal(keyword, client, error_flag, register_flag);
00505 }
00506