Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

dataserver.cc

Go to the documentation of this file.
00001 //**************************************************************************
00002 #include <ctype.h>
00003 #include <iomanip.h>
00004 #include "errcode.h"
00005 #include "misc.h"
00006 #include "myendian.h"
00007 #include "dataserver.h"
00008 #include "mythread.h"
00009 //**************************************************************************
00010 DEFINE_TEMPLATE_THREAD_CLASS(DataServerMainThread, DataServer);
00011 DEFINE_TEMPLATE_THREAD_CLASS(DataServerChildThread, ChildThreadStruct);
00012 //**************************************************************************
00013 DataServer::DataServer()
00014 {
00015 Parent = new DataServerMainThread;
00016 keytable = new KeyTable;
00017 callback = NULL;
00018 p_run_flag = 0;
00019 }
00020 //**************************************************************************
00021 DataServer::DataServer(int port)
00022 {
00023 Parent = new DataServerMainThread;
00024 keytable = new KeyTable;
00025 callback = NULL;
00026 p_run_flag = 0;
00027 Start(port);
00028 }
00029 //**************************************************************************
00030 DataServer::~DataServer()
00031 {
00032 if (p_run_flag)
00033         Terminate();
00034 delete Parent;
00035 delete keytable;
00036 }
00037 //**************************************************************************
00038 void DataServer::RegisterCallback(void (*cb)(char*))
00039 {
00040 callback = cb;
00041 }
00042 //**************************************************************************
00043 int DataServer::NumPendingThreads()
00044 {
00045 int num_pendingthreads = 0;
00046 int size = child.size();
00047 
00048 for (int i = 0; i < size; i++)
00049         {
00050          if (child[i]->isRunning)
00051                 ++num_pendingthreads;
00052         }
00053 
00054 return num_pendingthreads;
00055 }
00056 //**************************************************************************
00057 int DataServer::Terminate()
00058 {
00059 Stop();
00060 Finalize();
00061 p_run_flag = 0;
00062 cout << "----------------------------------------------------------------------" << endl;
00063 cout << "Stopping DataServer (ID = " << p_serverID << ")." << endl;
00064 cout << "----------------------------------------------------------------------" << endl;
00065 cout << "Total Bytes sent = " << setprecision(5) << BytesSent() << " at rate of " << setprecision(4) << SendRate() << " Mbps." << endl;
00066 cout << "Total Bytes recd = " << setprecision(5) << BytesRecd() << " at rate of " << setprecision(4) << RecvRate() << " Mbps." << endl;
00067 cout << "Total Uptime = " << myTimer()-start_time << " ms." << endl;
00068 cout << "----------------------------------------------------------------------" << endl;
00069 cout.flush();
00070 
00071 return 0;
00072 }
00073 //**************************************************************************
00074 int DataServer::Stop()
00075 {
00076 int n = NumPendingThreads();
00077 
00078 if (n)
00079         {
00080          cout << "DataServer: Waiting for " << n << " client(s) to exit...." << endl << flush;
00081         }
00082 
00083 while (NumPendingThreads())
00084         {
00085          mySleep(1000); // Sleep for 1 ms
00086         }
00087 
00088 return 0;
00089 }
00090 //**************************************************************************
00091 int DataServer::Start(int port)
00092 {
00093 Register();     // Register all the variables
00094 static int count = 0;
00095 p_serverID = ++count;
00096 p_run_flag = 1;
00097 
00098 int numtries = 1;
00099 while (!TCPSocket::Bind(port)) 
00100         {
00101          cerr << "Retrying after 5 seconds." << endl << flush;
00102          mySleep(5*1000000);    // Sleep for 5 seconds...
00103          if (numtries++ == 20)
00104                 return POSSE_FAILURE;
00105         }
00106 
00107 char hostname[200];
00108 gethostname(hostname,200);
00109 cout << "DataServer: Binding for " << hostname << ":" << port << endl << flush;
00110 
00111 TCPSocket::Listen();
00112 
00113 cout << "----------------------------------------------------------------------" << endl;
00114 cout << "Starting DataServer (ID = " << p_serverID << ")" << endl;
00115 cout << "----------------------------------------------------------------------" << endl << flush;
00116 Parent->Start(this);
00117 
00118 start_time = myTimer();
00119 
00120 return POSSE_SUCCESS;
00121 }
00122 //**************************************************************************
00123 int DataServer::Finalize()
00124 {
00125 TCPSocket::InboundClose();
00126 Parent->Kill();
00127 
00128 int size = child.size();
00129 for (int i = 0; i < size; i++)
00130         {
00131          delete child[i]->client;
00132          delete child[i]->th;
00133          delete child[i];
00134         }
00135 child.clear();
00136 p_hash_table.clear();
00137 
00138 return 0;
00139 }
00140 //**************************************************************************
00141 int DataServer::AcceptClient()
00142 {
00143 RemoteSocket *Client = new RemoteSocket;
00144 *((TCPSocket*) Client) = *((TCPSocket*) this);
00145 
00146 ChildThreadStruct *T = new ChildThreadStruct;
00147 T->isRunning = 0;
00148 T->server = this; 
00149 T->ID = child.size();
00150 T->th = NULL;
00151 T->client = Client;
00152 child.push_back(T);
00153 
00154 int fail = 0;
00155 while (!Client->Accept())       // Blocks until a client connection is received
00156         {
00157          fail = 1;
00158          cerr << "Retrying connection after 2 seconds." << endl << flush;
00159          mySleep(2*1000000);    // Wait for 2 sec
00160         }
00161 
00162 if (fail)
00163         {
00164          cerr << "Connected." << endl << flush;
00165         }
00166 
00167 return T->ID;
00168 }
00169 //**************************************************************************
00170 void DataServer::RunThread(int ID)
00171 {
00172 int count = 0;
00173 int verification_number = VERIFICATION_NUMBER;
00174 int endiantype = isBigEndian();
00175 RemoteSocket *C = child[ID]->client;
00176 char clientname[200];
00177 
00178 cout << "======================================================================" << endl;
00179 cout << "Accepting connection from client (ID = " << ID+1 << ") : " << flush;
00180 //C->RecvLine(clientname);
00181 GetKeyword(C, clientname);
00182 cout << clientname << ":" << C->PeerPort() << endl;
00183 if (strcmp(C->PeerName(), clientname) != 0)
00184         {
00185          cout << "\t[Tunneling via " << C->PeerName() << "]" << endl << flush;
00186         }
00187 cout << "======================================================================" << endl;
00188 cout.flush();
00189 char keyword[100];
00190 
00191 strcpy(keyword, "");
00192 int error_flag = 0;
00193 
00194 while (strcmp(keyword, "_q_private") != 0)
00195         {
00196          GetKeyword(C, keyword);  // Blocking call, does not return till the client sends a keyword
00197          error_flag = 1;
00198 #ifdef ANIDEBUG
00199          cout << "Got keyword (from client " << ID << "): " << keyword << endl << flush;
00200 #endif
00201 
00202          if (strcmp(keyword, "_q_private") == 0)
00203                 error_flag = 0;
00204 
00205          Register_Variable_Internal(C, error_flag, keyword, "_start_private", "ro", verification_number);
00206          Register_Variable_Internal(C, error_flag, keyword, "_isBigEndian", "ro", endiantype);
00207 
00208          if (!strcmp(keyword, "Rinternal_ftp")) // Recv File from Client
00209                 {
00210                  char filename[200];
00211                  error_flag = 0;
00212                  C->RecvLine(filename);
00213                  C->RecvFile(filename);
00214                 }
00215          if (!strcmp(keyword, "internal_ftp")) // Send File Requested to Client
00216                 {
00217                  char filename[200];
00218                  error_flag = 0; 
00219                  C->RecvLine(filename);
00220                  C->SendFile(filename);
00221                 }
00222 
00223          Register(keyword, C, error_flag);
00224                  
00225          if (!strcmp(keyword, "keytable")) 
00226                 {
00227                  Wait();
00228                         UpdateKeyTable();
00229                  Post();
00230                 }
00231 
00232          Register_Structure_Internal(C, error_flag, keyword, "keytable", "ro", *keytable);
00233 
00234          if (error_flag == 1)
00235                 {
00236                  cout << "Unidentified keyword \"" << keyword << "\"!!" << endl << flush;
00237                  SendVar<int>(0);
00238                 }
00239 
00240          if (error_flag == 2 && !isRunningMPI()) // Updating of "rw" variable
00241                 {
00242                  char newkeyword[100];
00243                  strcpy(newkeyword, &keyword[1]);
00244                  if (callback)
00245                         callback(newkeyword);
00246                  if (keytable->hasCallback(newkeyword))
00247                         keytable->Callback(newkeyword);
00248                 }
00249 
00250          ++count;
00251         }
00252 cout << "======================================================================" << endl;
00253 cout << "Releasing client (ID = " << ID+1 << "): " << clientname << ":" << C->PeerPort() << endl;
00254 cout << "======================================================================" << endl;
00255 cout.flush();
00256 
00257 return;
00258 }
00259 //**************************************************************************
00260 int DataServer::RegisterKey(char *key, DataType desc, char *vartype, int elem_size, int num_elems, char *perm, void *ptr)
00261 {
00262 Hash_Table::const_iterator iter = p_hash_table.find(key);
00263 if (iter != p_hash_table.end())
00264         {
00265          cout << "DataServer::RegisterKey: Key \"" << key << "\" already registered!!" << endl << flush;
00266          return iter->second;   // Key already registered
00267         }
00268 
00269 char type[80];
00270 int n = 0;
00271 
00272 for (int i = 0; i < strlen(vartype); i++)
00273         {
00274          if (!isspace(vartype[i]))
00275                 type[n++] = vartype[i];
00276         }
00277 type[n] = '\0';
00278 
00279 int id = keytable->n;
00280 ++keytable->n;
00281 p_hash_table[key] = id;
00282 
00283 #ifdef ANIDEBUG
00284 cout << "Registering key[" << id+1 << "] = \"" << key << "\"" << endl << flush;
00285 #endif
00286 
00287 keytable->Add();
00288 keytable->key[id].sem = new Semaphore;
00289 keytable->key[id].id = id;
00290 keytable->key[id].is_locked = 0;
00291 keytable->key[id].is_dirty = 0;
00292 keytable->key[id].elem_size = elem_size;
00293 keytable->key[id].num_elems = num_elems;
00294 keytable->key[id].ptr = ptr;
00295 if (strcmp(perm, "rw") == 0)
00296         keytable->key[id].perm = KEY_READ_WRITE;
00297 else if (strcmp(perm, "ro") == 0)
00298         keytable->key[id].perm = KEY_READ_ONLY;
00299 else
00300         {
00301          cout << "Invalid permission \"" << perm << "\" for keyword \"" << key << "\"!!" << endl << flush;
00302          exit(-1);
00303         }
00304 strcpy(keytable->key[id].key, key);
00305 keytable->key[id].description = desc;
00306 strcpy(keytable->key[id].type, type);
00307 
00308 keytable->key[id].vartype = UNKNOWN;
00309 
00310 static char typedesc[][20] = {"char", "short", "int", "long", "float", "double"};
00311 static VariableType vtype[] = {CHAR, SHORT, INT, LONG, FLOAT, DOUBLE};
00312 static int ntype = sizeof(typedesc)/sizeof(char[20]);
00313 
00314 for (int j = 0; j < ntype; j++)
00315         {
00316          if (strcmp(type, typedesc[j]) == 0)
00317                 { 
00318                  keytable->key[id].vartype = vtype[j];
00319                  break; 
00320                 }
00321         }
00322 
00323 return id;
00324 }
00325 //**************************************************************************
00326 int DataServer::UpdateKeyTable()
00327 {
00328 KeyTable *K = keytable;
00329 for (int i = 0; i < K->n; i++)
00330         {
00331          K->key[i].is_locked = K->key[i].sem->WaitCount() - K->key[i].sem->PostCount();
00332          int maxlen = sizeof(K->key[i].value);
00333          memcpy(K->key[i].value, GetValue(K->key[i].key), maxlen-1);
00334          K->key[i].value[maxlen-1] = '\0';
00335          if (K->key[i].is_locked < 0) K->key[i].is_locked = 0;
00336         }
00337 return 0;
00338 }
00339 //**************************************************************************
00340 int DataServer::ClearLocks()
00341 {
00342 int n = 0;
00343 for (int i = 0; i < keytable->n; i++)
00344         {
00345          keytable->key[i].is_locked = keytable->key[i].sem->WaitCount() - keytable->key[i].sem->PostCount();
00346          if (keytable->key[i].is_locked > 0)
00347                 {
00348                  ++n;
00349                  cout << "\tRemoving lock on key \"" << keytable->key[i].key << "\"." << endl << flush;
00350                  keytable->key[i].sem->Post();
00351 
00352                 }
00353         }
00354 return n;
00355 }
00356 //**************************************************************************
00357 char *DataServer::GetValue(int i)
00358 {
00359 return GetValue(Key(i));
00360 }
00361 //**************************************************************************
00362 char *DataServer::GetValue(char *keyword)
00363 {
00364 char *val = new char[100];
00365 VariableType vtype = UNKNOWN;
00366 KeyTable *K = keytable;
00367 
00368 int handle = K->getKeyHandle(keyword);
00369 if (handle < K->n && K->isViewableVar(handle))
00370         vtype = K->key[handle].vartype;
00371 
00372 // If ptr points to memory under 1 MB, it is probably invalid!!
00373 if (vtype != UNKNOWN && (int) K->key[handle].ptr < 1024*1024)
00374         vtype = UNKNOWN;
00375 
00376 switch (vtype)
00377         {
00378          case CHAR:
00379                 if (K->isCharArray(handle))
00380                         {
00381                          memcpy(val, K->key[handle].ptr, 99);
00382                          val[99] = '\0';
00383                         }
00384                 else
00385                         {
00386                          char x_char;
00387                          memcpy(&x_char, K->key[handle].ptr, sizeof(char));
00388                          sprintf(val, "%c\0", x_char);
00389                         }
00390                 break;
00391 
00392          case SHORT:
00393                 short x_short;
00394                 memcpy(&x_short, K->key[handle].ptr, sizeof(short));
00395                 sprintf(val, "%d\0", x_short);
00396                 break;
00397 
00398          case INT:
00399                 int x_int;
00400                 memcpy(&x_int, K->key[handle].ptr, sizeof(int));
00401                 sprintf(val, "%d\0", x_int);
00402                 break;
00403 
00404          case LONG:
00405                 long x_long;
00406                 memcpy(&x_long, K->key[handle].ptr, sizeof(long));
00407                 sprintf(val, "%ld\0", x_long);
00408                 break;
00409 
00410          case FLOAT:
00411                 float x_float;
00412                 memcpy(&x_float, K->key[handle].ptr, sizeof(float));
00413                 sprintf(val, "%g\0", x_float);
00414                 break;
00415 
00416          case DOUBLE:
00417                 double x_double;
00418                 memcpy(&x_double, K->key[handle].ptr, sizeof(double));
00419                 sprintf(val, "%g\0", x_double);
00420                 break;
00421 
00422          default:       // case UNKNOWN:
00423                 strcpy(val, "error");
00424                 break;
00425         }
00426         
00427 return val;
00428 }
00429 //**************************************************************************
00430 void DataServer::PrintDirty(ostream &out, int change)
00431 {
00432 for (int i = 0; i < NumKeys(); i++)
00433         if (IsDirtyID(i, change))
00434                 out << "\tDataServer: New value for \"" << Key(i) << "\" = " << GetValue(i) << endl << flush;
00435 }
00436 //**************************************************************************
00438 void DataServerMainThread::Run(DataServer *S)
00439 {
00440 signal(SIGABRT, SIG_IGN);
00441 while (!isExit())
00442         {
00443          int id = S->AcceptClient(); // Blocking call, does not return till
00444                                      // a new client request is received.
00445          S->child[id]->th = new DataServerChildThread;
00446          S->child[id]->th->Start(S->child[id]);
00447         }
00448 }
00449 //**************************************************************************
00450 void DataServerChildThread::Run(ChildThreadStruct *T)
00451 {
00452 trap_sigpipe();
00453 
00454 try
00455         {
00456          T->isRunning = 1;
00457          T->server->RunThread(T->ID);
00458          T->isRunning = 0;
00459         }
00460 catch (int &e)
00461         {
00462 #ifndef __CYGWIN32__
00463          if (e == ERR_BAD_SOCKET || e == ERR_RECV_MESG || e == ERR_SEND_MESG)
00464                 {
00465                  cout << "DataServer: Killing child thread " << T->th->ID() << "." << endl;
00466                 }
00467          if (e == ERR_SIGPIPE)
00468                 {
00469                  cout << "DataServer:SIGPIPE: Killing child thread " << T->th->ID() << "." << endl;
00470                 }
00471 #endif
00472          if (e == ERR_BAD_DATA)
00473                 {
00474                  return;
00475                 }
00476          T->server->ClearLocks();
00477          T->isRunning = 0;
00478          T->th->Kill();
00479         }
00480 catch (...)
00481         {
00482          cout << "Another exception!!" << endl;
00483         }
00484 }
00485 //**************************************************************************
00486 int DataServer::Synchronize(char *filename)
00487 {
00488 if (filename)
00489         {
00490          ofstream file(filename, ofstream::app);
00491          PrintDirty(file, 1);
00492          file.close();
00493         }
00494 else
00495         PrintDirty(cout, 1);
00496 
00497 return POSSE_SUCCESS;
00498 }
00499 //**************************************************************************
00500 int DataServer::Register(char *keyword, RemoteSocket *client, int &error_flag, bool register_flag)
00501 {
00502 if (RegisterInternal(keyword, client, error_flag, register_flag) == 1)
00503         return 1;
00504 return RegisterExternal(keyword, client, error_flag, register_flag);
00505 }
00506 //**************************************************************************

Generated on Sun Jun 16 17:36:49 2002 for POSSE: Portable Object-oriented Scientific Steering Environment by doxygen1.2.13.1 written by Dimitri van Heesch, © 1997-2001