zbsread.h

Go to the documentation of this file.
00001 
00002 #ifndef ZBSREAD_H
00003 #define ZBSREAD_H
00004 
00005 #include <zmq.h>
00006 #include <assert.h>
00007 #include <signal.h>
00008 
00009 #if HAVE_JSON
00010 #include <json/json.h>
00011 #endif
00012 
00013 #include <zbsDataHolders.h>
00014 #include <zbsdtHelper.h>
00015 
00016 using namespace CAFE_BSHELPER;
00017 
00018 //Preferred to BitsetV because of speed
00019 
00020 IntegerBitset<unsigned char> bsetUChar; 
00021 IntegerBitset<char> bsetChar; 
00022 IntegerBitset<unsigned short> bsetUShort; 
00023 IntegerBitset<short> bsetShort; 
00024 IntegerBitset<unsigned int> bsetUInt; 
00025 IntegerBitset<int> bsetInt; 
00026 IntegerBitset<unsigned long long> bsetULongLong; 
00027 IntegerBitset<long long> bsetLongLong; 
00028 
00029 //IntegerBitsetV<unsigned char> bsetUCharV; 
00030 //IntegerBitsetV<char> bsetCharV; 
00031 //IntegerBitsetV<unsigned short> bsetUShortV; 
00032 //IntegerBitsetV<short> bsetShortV; 
00033 //IntegerBitsetV<unsigned int> bsetUIntV; 
00034 //IntegerBitsetV<int> bsetIntV; 
00035 //IntegerBitsetV<unsigned long long> bsetULongLongV; 
00036 //IntegerBitsetV<long long> bsetLongLongV; 
00037 
00038 FloatBitset<float>  bsetFloat;
00039 FloatBitset<double> bsetDouble;
00040 //FloatBitsetV<float>  bsetFloatV;
00041 //FloatBitsetV<double> bsetDoubleV;
00042 
00043 
00044 bool bsdtInsertFlag=false;
00045 bool dataHeaderReceived=false;
00046 
00047 unsigned short inDumpFlag=2;
00048 std::string hashIs="";
00049 std::string hashOriginal="";
00050 unsigned short hashOriginalFlag=0;
00051 bool fillBSPV=false;
00052 std::vector<std::string> bsPV;
00053 
00054 bool fill_bs_read_PV=false;
00055 std::vector<std::string> bs_read_PV;
00056 
00057 
00058 
00064 static void
00065 z_bsread_dbpm (DBPMKeeper & dbpm)
00066 {
00067                 if (inDumpFlag==1) {
00068                         std::cout << "zeroMQ socket is busy " << std::endl;
00069                         std::cout << "waiting for zmq timeout " << std::endl;
00070                 }
00071                 
00072                 //puts ("//START----------------------------------------//");
00073                 
00074                 void * socket = dbpm.subscriber;
00075                 
00076                 #if HAVE_JSON
00077                         Json::Value parsedFromString;
00078                         Json::Reader reader;
00079                         bool parsingSuccessful;
00080                         Json::FastWriter fastWriter;
00081                 #endif
00082                 
00083                 int64_t more;           //  Multipart detection
00084     more = 0;
00085     size_t more_size = sizeof (more);
00086                 
00087                 int bsPVIdx=-1;
00088                 dbpm.status=ICAFE_NORMAL;
00089 
00090                 int nZeroSize=0;
00091 
00092     //std::cout << "df = " << inDumpFlag << std::endl;  
00093 
00094           while (inDumpFlag==1) {
00095                   std::cout << "df/ = " << inDumpFlag << std::endl; 
00096                         std::cout << " sleeping " << std::endl;                         
00097                 }
00098 
00099                 int subMessage=0;               
00100                 int nSequentialHeader=0;
00101 
00102     while (1) {
00103                           inDumpFlag=1;
00104                                                         
00105                         //puts ("//WHILE LOOP ----------------------------------------//");
00106                         //std::cout << "subMessage " << subMessage << std::endl;
00107         //  Process all parts of the message
00108         zmq_msg_t message;
00109         zmq_msg_init (&message);
00110                                 
00111         size_t size = zmq_msg_recv (&message, socket, 0);
00112                                 //puts ("//MESSAGE RECEIVED     ----------------------------------------//");
00113                                 
00114                                 if (size == -1) {
00115                                         std::cout << " Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
00116                                         //Resource unavailable means that there is nothing to read now
00117                                         
00118                                         zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00119                                         //std::cout << "message mulipart --> more == " << more << std::endl;
00120                                          zmq_msg_close (&message);
00121                                          //if (!more) {
00122                                          //                dbpm.status=ERRNO_EAGAIN; //ICAFE_ERRNO_BASE +zmq_errno() ;
00123                                          //                std::cout << "EARLY BREAK subMessage total: " << subMessage << std::endl;
00124                                          //                puts ("//------------------------------------END-------------------------------------//");
00125                                          //              break;      //  Last message
00126                                          // }
00127 
00128 
00129 
00130                                          dbpm.status=ICAFE_ERRNO_BASE+zmq_errno() ;
00131                                          break;
00132                                 } 
00133                                 
00134                                 
00135                                 else if (size == 0) {
00136 
00138                         
00139                                         ++nZeroSize;
00140 
00141 
00142                                          //zmq_msg_close (&message);
00143                                          //dbpm.status=ECAFE_NODATA;
00144                                          //break;
00145                                          //Comes in pairs; one for val one for timestamp
00146                                          if (nZeroSize%2==1) {
00147                                            ++bsPVIdx;
00148                                                 if (bsPVIdx >0) {
00149                                                    std::cout << bsPV[bsPVIdx] << std::endl;
00150                                                 }
00151                                          }
00152 
00153                                          ++subMessage;
00154                                          
00155                                          //continue;
00156                                 } 
00157         else { 
00158                         
00159         //  Dump the message as text or binary
00160         char *data = (char*)zmq_msg_data (&message);
00161         int is_text = 1;
00162         int char_nbr;
00163                                 
00164                                 //char cmd[5000]="";
00165                                 
00166         for (char_nbr = 0; char_nbr < size; char_nbr++)
00167             if ((unsigned char) data [char_nbr] < 32
00168             ||  (unsigned char) data [char_nbr] > 127)
00169                 is_text = 0;
00170 
00171 
00172                 //TExt is two header files
00173                 //non-text size=8 is X1, Y1, Q1
00174                 //non-text size=2 is -VALID
00175                 //non-text size=16 is timestamp
00176 
00177                 /*
00178                 printf (" SIZE [%03d] ", size);
00179 
00180 
00181                 if (is_text) {
00182                         std::cout << " TEXT +++++++++++ " << std::endl;
00183                 }
00184                 else {
00185                         std::cout << " NOT TEXT +++++++++++ " << std::endl;
00186                 }
00187 
00188 
00189         for (char_nbr = 0; char_nbr < size; char_nbr++) {
00190             if (is_text) {
00191                 printf ("%c", data [char_nbr]);
00192                                                                   //snprintf(cmd + strlen(cmd), (sizeof cmd) - strlen(cmd), "%c", data [char_nbr]);             
00193                                                                                                                                                                                                                                                                         
00194                                                 }
00195             else {
00196                  printf ("%02X", (unsigned char) data [char_nbr]);
00197                                                                  //printf ("%d",   (unsigned char) data [char_nbr]);
00198                                                                  
00199                                                                 //snprintf(cmd + strlen(cmd), (sizeof cmd) - strlen(cmd), "%d", (unsigned char)data [char_nbr]);
00200                                                                 
00201                                                                 //if (data[0] == '\x7') {
00202                                                                 //      std::cout << " little endian " << std::endl;
00203                                                                 //}
00204                                                                 //else {
00205                                                                 //      std::cout << "big endian " << std::endl;
00206                                                                 //}
00207                                 
00208                                                                 
00209                                                 }
00210         }//for
00211                 printf ("\n");
00212                 */
00213                                 
00214                         
00215                                 if (is_text) {
00216                                                 parsingSuccessful=reader.parse(data, parsedFromString);
00217                                                 if (parsingSuccessful) {
00218                                                         //Json::StyledWriter styledWriter;
00219                                                         //std::cout << "STYLED: --------------------------------" << std::endl;
00220                                                         //std::cout << styledWriter.write(parsedFromString) << std::endl;
00221                                                         //std::cout << "----------------------------------" << std::endl;
00222                                                         //std::cout << parsedFromString["htype"] << std::endl;
00223                                                         
00224                                                         
00225                                                         if (fastWriter.write(parsedFromString["htype"]).find("bsr_m-1.1") != std::string::npos) { 
00226 
00227                                                                 ++nSequentialHeader;
00228 
00229                                                                 hashIs=parsedFromString["hash"].asString();
00230 
00231                                                                 if (hashOriginalFlag==0) {
00232                                                                         std::cout << hashIs << " is different to original/// " << hashOriginal << std::endl;    
00233                                                                         hashOriginal=hashIs;
00234                                                                         ++hashOriginalFlag;
00235                                                                         fillBSPV=true;
00236                                                                 }
00237 
00238                                                                 if (hashOriginal.compare(hashIs)!=0) {
00239                                                                         std::cout << hashIs << " is different to original " << hashOriginal << std::endl;       
00240                                                                         hashOriginal=hashIs;
00241                                                                         ++hashOriginalFlag;
00242                                                                         fillBSPV=true;
00243                                                                                                                                 
00244                                                                 }
00245                                                                  //std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
00246 
00247                                                                  dbpm.setPulse_id(parsedFromString["pulse_id"].asUInt64());
00248 
00249                                                                 //Reset values as a change of hash signifies that data from two pulse ids
00250                                                                 //is being sent in one zeromq messages
00251 
00252                                                                 subMessage=0;
00253                                                                 
00254                                                                 bsPVIdx=-1;
00255                                                                 dbpm.status=ICAFE_NORMAL;
00256 
00257                                                                 nZeroSize=0;
00258                                                                 
00259                                                                 
00260                                                         /*
00261                                                         std::cout << "(1)++++++++++++++++++++++++++++++++++++++++MAIN++++++++++++++++++++++++++++++++++++++++++"        << std::endl;
00262                                                 
00263                                                                 std::cout << "hash " << parsedFromString["hash"] << std::endl;
00264                                                                 std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
00265                                                                 std::cout << "g ts " << parsedFromString["global_timestamp"] << std::endl;
00266                                                                 std::cout << "comp " << parsedFromString["dh_compression"] << std::endl;
00267                                                         
00268                                                                 std::cout << "sec " << parsedFromString["global_timestamp"]["sec"].asUInt() << std::endl;
00269                                                           std::cout << "nsec " << parsedFromString["global_timestamp"]["ns"].asUInt() << std::endl;
00270                                                         */
00271                                                                 
00272                                                         }       
00273                                                         else if (fastWriter.write(parsedFromString["htype"]).find("bsr_d-1.1") != std::string::npos) { 
00274                                                         
00275                                                         ++nSequentialHeader;
00276                                                                 
00277                                                          if  (fillBSPV) {
00278                                                         
00279                                                                         bsPV.clear();
00280                                                                         bsPV.reserve(dbpm.getNPV());
00281                                                                         /*
00282                                                                         if (dbpm.getNPV() != parsedFromString["channels"].size() ) {
00283                                                                                 std::cout << "No of CONFIGURED BPMS: " << dbpm.getNPV() 
00284                                                                                      << " is diffent to that being channeled " <<  parsedFromString["channels"].size() << std::endl;
00285                                                                                 bsPV.reserve( std::max( (size_t) parsedFromString["channels"].size(),dbpm.getNPV()) );   
00286                                                                                 
00287                                                                         }
00288                                                                         
00289                                                                 */
00290                 
00291                                                                                                                                 
00292                                                                 //std::cout << "chan " << parsedFromString["channels"] << std::endl;
00293                                                                 
00294                                                                 //std::cout << "No of channels " << parsedFromString["channels"].size() << std::endl;
00295 
00296                                                                 //std::cout << "(2)++++++++++++++++++++++++++++++++++++++++HEADER++++++++++++++++++++++++++++++++++++++++++"    << std::endl;
00297                                                                                                         
00298                                                                                 for (Json::Value::ArrayIndex i=0; i < parsedFromString["channels"].size(); ++ i) {
00299                                                         
00300                                                                                         //std::cout << "name " << parsedFromString["channels"][i]["name"].asString() << std::endl;
00301                                                                                         //std::cout << "enco " << parsedFromString["channels"][i]["encoding"] << std::endl;
00302                                                                                 //std::cout << "type " << parsedFromString["channels"][i]["type"] << std::endl;
00303                                                                          
00304                                                                                                 bsPV.push_back( (parsedFromString["channels"][i]["name"]).asString());
00305                                                                                 }
00306                                                                                 /*
00307                                                                                 std::cout << "LIST OF PVS " << std::endl;
00308                                                                                 for (size_t i=0; i< bsPV.size(); ++i ) {                        
00309                                                                                         std::cout << i << " // " <<   bsPV[i].c_str() << " " << std::endl;
00310                                                                                 }
00311                                                                                 std::cout << std::endl;
00312                                                                                 */
00313                                                                                 
00314                                                                                 //std::cout << "NEW FILL: size of bsPV " <<  bsPV.size() << std::endl;
00315                                                                                 fillBSPV=false;
00316                                                                 } //if fill
00317 
00318                                                         }       
00319                                                         
00320                                                         else if (fastWriter.write(parsedFromString["htype"]).find("bsr_reconnect-1.0") != std::string::npos) {
00321                                                         
00322                                                                                 std::cout << "RECONNECT CONTROL MESSAGE " << std::endl;
00323                                                                                 std::cout << "HAS NEW SOURCE ADDRESS: " <<  fastWriter.write(parsedFromString["address"])  << std::endl;
00324                                                                         
00325                                                         }
00326                                                         else if (fastWriter.write(parsedFromString["htype"]).find("bsr_stop-1.0") != std::string::npos) {
00327                                                         
00328                                                                                 std::cout << "STOP CONTROL MESSAGE " << std::endl;
00329                                                                                 std::cout << "RECEIVED: " <<  fastWriter.write(parsedFromString["htype"])  << std::endl;
00330                                                                                 std::cout << "From BS Documentation: Message can be ignored as source will send from same address after startup" << std::endl;
00331                                                                                 
00332                                                         }
00333                                                         
00334                                                         else {
00335                                                         
00336                                                           std::cout << "HEADER IS SOMETHING ELSE: " << std::endl;
00337                                                                                 std::cout << parsedFromString["htype"] << std::endl;
00338                                                         
00339                                                                 exit(1);
00340                                                         
00341                                                         }
00342                                                         
00343                                                         
00344                                                         //std::cout << "/----------------------------------/" << std::endl;
00345                                                                 
00346                                         }
00347                                 }
00348                                 
00349                                 
00350                                 if (nSequentialHeader >3 && is_text==1) {
00351                                          std::cout << "WARNING: ZEROMQ SUB-MESSAGE DOES NOT CLOSE " << std::endl; 
00352                                          std::cout << "WARNING: FORCING ZMQ_MSG_CLOSE  " << std::endl;
00353 
00354                                          std::cout << "No of sequential headers " << nSequentialHeader << std::endl;
00355                                          std::cout << "is_text " << is_text << std::endl;
00356                                          //zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00357                                                                  
00358                                          //zmq_msg_close (&message);
00359                                          //std::cout << "message mulipart --> more == " << more << std::endl;
00360                                                 
00361 
00362                                          //nSequentialHeader=0;
00363                                          //cannot break; results in seqmentation fault
00364 
00365                                          //if (!more) {
00366                                         //                 dbpm.status=ERRNO_EAGAIN; //ICAFE_ERRNO_BASE +zmq_errno() ;
00367                                         //                 std::cout << "EARLY BREAK subMessage total: " << subMessage << std::endl;
00368                                         //                 puts ("//------------------------------------END-------------------------------------//");
00369 
00370                                         // break;      //  Last message
00371                                         //}
00372                                 
00373                                 }
00374 
00375                                 
00376                                 
00377                                 union foo
00378                                 {
00379                                 char c[sizeof(double)];
00380                                 double d;
00381                         
00382                                 } bar;
00383                                 
00384                                 //SIZE 16 is timestamp
00385                                 //SIZE 8  is x,y,Q
00386                                 //SIZE 2  is valid
00387                                 
00388                                 if (subMessage > 1430) {
00389                                   std::cout << " SIZE " << size << " subMessage " << subMessage << std::endl;
00390                                   std::cout << "size of bsPV " <<  bsPV.size() << std::endl;
00391                                 }       
00392                                         
00393                                 if (subMessage > 0 && subMessage%2 ==0) {
00394                                         if (size==8) {
00395                                                 
00396                                                 // big endian   
00397                                                 for (char_nbr = 0; char_nbr < size; char_nbr++) {
00398                                                  bar.c[char_nbr]=data[size-1-char_nbr];  // THis works for big engian
00399                                                 }
00400                                                 // little endian        
00401                                                 //for (char_nbr = 0; char_nbr < size; char_nbr++) {
00402                                                 // bar.c[char_nbr]=data[char_nbr];  
00403                                                 //}
00404                                                 
00405                                                 
00406                                                 //std::cout << "UNION D " << bar.d << std::endl;
00407                                         
00408                                                 
00409                                                 double v; // = (double*) data;
00410                                                 memcpy(&v, bar.c, sizeof(double));
00411                                                         //std::cout << " double val " << v << std::endl;
00412                                 
00413                                         //This is BPM Data - value
00414                                         ++bsPVIdx;
00415                                          
00416                                         if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)  {
00417                                           std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
00418                                                 std::cout << " bsPV index = " <<  bsPVIdx << std::endl;
00419                                           std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
00420                                           std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
00421                                                 std::cout << " SKIPPING THIS BPM... " << std::endl;
00422                                                 continue;
00423                                         }
00424                                         
00425                                         
00426                                         dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set(v);
00427 
00428                                         dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].setStatus(ICAFE_NORMAL);
00429 
00430                                         //std::cout << " readback of value that was set = " << dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].val[0].d << std::endl;
00431                                                 
00432                                         }
00433                                         else if (size==2) {
00434                                                 unsigned short iv = 0;
00435                                                 //memcpy(&iv, cmd, size);
00436                                                         //big endian
00437                                                         for (size_t n=0; n < size; n++) {
00438                                                                 iv = (iv << 8) + data[n];
00439                                                         }
00440                                                         //little endian
00441                                                         //for (size_t n = size; n >= 0; n--) {
00442                                                         //      iv = (iv << 8) + data[n];
00443                                                         //}
00444                                         
00445                                         //This is BPM Data      - VALID/INVALID 
00446                                                         
00447                                                         //std::cout << "uint val (1 means valid) " << iv << std::endl;
00448                                                         
00449                                           /*
00450                                                 std::copy(data, data + 32, reinterpret_cast<char *>(&i));
00451                                                 std::cout << "uint16  val " << i << std::endl;
00452                                                 std::copy(data, data + 8, reinterpret_cast<char *>(&i));
00453                                                 std::cout << "uint16  val " << i << std::endl;
00454                                                 std::copy(data, data + 4, reinterpret_cast<char *>(&i));
00455                                                 std::cout << "uint16  val " << i << std::endl;
00456                                                 std::copy(data, data + 2, reinterpret_cast<char *>(&i));
00457                                                 std::cout << "uint16  val " << i << std::endl;
00458                                                 std::copy(data, data + 1, reinterpret_cast<char *>(&i));
00459                                                 std::cout << "uint16  val " << i << std::endl;
00460                                                 
00461                                                 */
00462                                                         ++bsPVIdx;
00463                                                 
00464                                                          
00465                                         if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)  {
00466                                           std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
00467                                                 std::cout << " bsPV index = " <<  bsPVIdx << std::endl;
00468                                           std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
00469                                           std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
00470                                                 std::cout << " SKIPPING THIS BPM ENUM TYPE " << std::endl;
00471                                                 continue;
00472                                         }
00473                                                 
00474                                                                 
00475                                                                 
00476                                                                 
00477                                                         if (iv==1) {
00478                                                                 dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set((std::string) "VALID");
00479                                                                 
00480                                                         }       
00481                                                         else {
00482                                                                 dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set((std::string) "INVALID");
00483                                                         }
00484                                                         
00485                                                                 //std::cout << "value DBPM = " << dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].val[0].str << std::endl;
00486 
00487 
00488                                                                 dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].setStatus(ICAFE_NORMAL);
00489                                         }       
00490                                 
00491                                         
00492                                 }
00493                                 else if (subMessage > 1 && subMessage%2 ==1) {
00494                                         //std::cout << "timestamp " << std::endl;
00495                                         unsigned int a=0,b=0;
00496                                         for (size_t n=0; n < size/2; n++) {
00497                                                                 a = (a << 8) + data[n];
00498                                         }
00499                                         for (size_t n=size/2; n < size; n++) {
00500                                                                 b = (b << 8) + data[n];
00501                                         }
00502                                         
00503                                         //std::cout << "a " << a << " b " << b << std::endl;
00504                                         
00505                                          
00506                                         if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)  {
00507                                           std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
00508                                                 std::cout << " bsPV index = " <<  bsPVIdx << std::endl;
00509                                           std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
00510                                           std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
00511                                                 std::cout << " SKIPPING THIS BPM TIMESTAMP " << std::endl;
00512                                                 continue;
00513                                         }
00514                                         
00515                                         
00516                                         
00517                                         dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].ts.secPastEpoch=a;
00518                                         dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].ts.nsec=b;
00519                                 }
00520                                 
00521 
00522 
00523                                         //std::cout << "subMessage above: " << subMessage << std::endl;
00524 
00525                                         ++subMessage;
00526 
00527       
00528                 } //ifelse
00529                                 
00530       
00531         zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00532 
00533         zmq_msg_close (&message);
00534                                 
00535                 //std::cout << "value of more " << more << std::endl;
00536                                 
00537         if (!more) {
00538                                   dbpm.status=ICAFE_NORMAL;
00539                                  // std::cout << "subMessage total: " << subMessage << std::endl;
00540                                  // std::cout << " of which 0 size: " << nZeroSize << std::endl;
00541                                  // std::cout << " Percentage good: " << (subMessage-nZeroSize-2)*100/(subMessage-2);
00542                                  // puts ("//------------------------------------END-------------------------------------//");
00543                                          
00544             break;      //  Last message part
00545                                 }
00546     } //while 1
00547                 
00548                 inDumpFlag=0;
00549         
00550                 if (subMessage>2) {
00551                         dbpm.status=ICAFE_NORMAL;
00552                         //std::cout << "subMessage total: " << subMessage << std::endl;
00553                         //std::cout << " of which 0 size: " << nZeroSize << std::endl;
00554                         //std::cout << " Percentage good: " << (subMessage-nZeroSize-2)*100/(subMessage-2);
00555                         //puts ("//------------------------------------END-------------------------------------//");
00556                 }
00557 
00558 
00559                 //std::cout << "end of loop " << std::endl;
00560                 
00561                 return;
00562 }
00563 
00564 
00570 static void z_bsread (BSDataHolder &bsd){
00571 #define __METHOD__  "z_bsread (BSDataHolder &bsd)"
00572         unsigned int localDebug=0;
00573         
00574         bsd.overallStatus=ICAFE_NORMAL;
00575         
00576         //Data type index
00577         if (!bsdtInsertFlag) {
00578                 CAFE_BSHELPER::bsdtInsert();
00579                 bsdtInsertFlag=true;
00580         } 
00581         bsreadContainer_set_by_name & name_index = CAFE_BSHELPER::bsdt.get<by_bsName> ();
00582         bsreadContainer_set_by_name::iterator it_name;
00583         unsigned int bsdtIndex;
00584          
00585         if(localDebug)puts ("//START----------------------------------------//\n");
00586         int subMessage=0;
00587         void * socket = bsd.subscriber;
00588         int64_t more =0;           //  Multipart detection
00589         size_t more_size = sizeof (more);
00590         //Keep a count of the number of MULTIpart messages with zero data
00591         int nZeroSize=0;
00592         
00593         unsigned long ts,tnsec;
00594                 
00595         //Matching PV Index to BSChannel in bsd
00596         int bsPVIdx=-1;
00597         
00598         #if HAVE_JSON
00599         Json::Value parsedFromString;
00600         Json::Reader reader;    
00601         Json::FastWriter fastWriter;
00602         #endif
00603         bool parsingSuccessful; 
00604         bool mainHeaderReceived=false;  
00605         bool newHash=false;
00606         
00607         //subMessage is incremeneted at end of multi-part message
00608         
00609         //The Loop
00610         while (1) {
00611 
00612                 //Process all parts of the message
00613                 zmq_msg_t message;
00614                 zmq_msg_init (&message);
00615                 size_t size = zmq_msg_recv (&message, socket, 0);
00616                 if(localDebug)printf ("[%03lu] \n", size);
00617                 if (size == -1) {
00618                         std::cout << " Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
00619                         //Resource unavailable means that there is nothing to read now
00620                         zmq_msg_close (&message);
00621                         bsd.overallStatus=ICAFE_ERRNO_BASE+zmq_errno(); 
00622                         for (size_t i=0; i < bsd.getNPV(); ++i) {
00623                                 if(bsd.getBSChannel(i).isBSEnabled()) {
00624                                         bsd.pvd[i].set(0);
00625                                         bsd.pvd[i].setStatus(bsd.overallStatus);
00626                                         bsd.pvd[i].setAlarmStatus(-1);
00627                                         bsd.pvd[i].setAlarmSeverity(-1);
00628                                         bsd.pvd[i].ts.secPastEpoch=0;
00629                                         bsd.pvd[i].ts.nsec        =0;
00630                                         bsd.pvd[i].setPulseID(0);
00631                                 }
00632                         }       
00633                         break;
00634                 } 
00635                 else if (size == 0) {
00636                         if(localDebug)std::cout << " Data of Zero SIZE for submessage " << subMessage << std::endl;
00637                         ++nZeroSize;
00638                         //Avoid timestamp blob in count
00639                         if (nZeroSize%2==1) {
00640                                 ++bsPVIdx;  //Increment when data
00641                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(0);
00642                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ECAFE_BSREAD_MULTIPART_MESS_NODATA);
00643                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setPulseID(bsd.getPulse_id());
00644                         }
00645                         else { //Zero Timestamp
00646                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.secPastEpoch=0;
00647                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.nsec        =0;
00648                         }
00649                         if (bsd.overallStatus == ICAFE_NORMAL) {
00650                                 bsd.overallStatus=ECAFE_BSREAD_MULTIPART_MESS_NODATA;
00651                         }
00652                 } 
00653                 else { 
00654                 
00655                         //  Dump the message as text or binary
00656                         if(localDebug)printf ("[%03lu] \n", size);
00657                         
00658                         char *data = (char*)zmq_msg_data (&message);
00659                         
00660                         //Check if it is text and is so if it is the main header or not
00661                         
00662                         //First message is always the Main Header
00663                         
00664                         if (!mainHeaderReceived) {
00665                                 //std::cout << "MAIN HEADER ================================================================================= " << std::endl;
00666                                 parsingSuccessful=reader.parse(data, parsedFromString);
00667                                 if (parsingSuccessful) {
00668                                         
00669                                         if (localDebug==1) {
00670                                                 Json::StyledWriter styledWriter;
00671                                                 std::cout << "STYLED: --------------------------------" << std::endl;
00672                                                 std::cout << styledWriter.write(parsedFromString) << std::endl;
00673                                                 std::cout << "----------------------------------" << std::endl;
00674                                                 std::cout << "HEADER TYPE" << std::endl;
00675                                                 std::cout << parsedFromString["htype"].asString() << std::endl;
00676                                         }
00677                                         
00678                                         if (fastWriter.write(parsedFromString["htype"]).find("bsr_m-1.1") != std::string::npos) { 
00679                                                 
00680                                                 if (localDebug) {
00681                                                         std::cout << "hash " << parsedFromString["hash"].asString() << std::endl;
00682                                                         std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
00683                                                         std::cout << "g ts " << parsedFromString["global_timestamp"] << std::endl;
00684                                                         std::cout << "comp " << parsedFromString["dh_compression"].asString() << std::endl;
00685                                                         std::cout << "sec  " << parsedFromString["global_timestamp"]["sec"].asUInt() << std::endl;
00686                                                         std::cout << "nsec " << parsedFromString["global_timestamp"]["ns"].asUInt() << std::endl;
00687                                                 }
00688                                                 //required
00689                                                 bsd.setHtype( parsedFromString["htype"].asString() );
00690                                                 
00691                                                 if (bsd.getHash().compare(parsedFromString["hash"].asString()) !=0 ){
00692                                                         bsd.setHash( parsedFromString["hash"].asString() );
00693                                                         newHash=true;
00694                                                         //std::cout << " NEW HASHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHh " << std::endl;
00695                                                 }
00696                                                 bsd.setPulse_id(parsedFromString["pulse_id"].asUInt64());
00697                                                 //optional
00698                                                 if (!(parsedFromString["global_timestamp"]).empty()) {
00699                                                         bsd.setGlobal_timestamp(parsedFromString["global_timestamp"]["sec"].asUInt(),
00700                                                                                                                                                         parsedFromString["global_timestamp"]["ns"].asUInt());
00701                                                 }
00702                                                 //Check for Data Compression here!!
00703                                                 if (!(parsedFromString["dh_compression"].asString()).empty()) {                                                         
00704                                                         bsd.setDH_compression(parsedFromString["dh_compression"].asString() );                                          
00705                                                 } 
00706                                         }
00707                                         else if (fastWriter.write(parsedFromString["htype"]).find("bsr_reconnect-1.0") != std::string::npos) {
00708                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00709                                                 std::cout << "RECONNECT CONTROL MESSAGE " << std::endl;
00710                                                 std::cout << "HAS NEW SOURCE ADDRESS: " <<  fastWriter.write(parsedFromString["address"])  << std::endl;
00711                                                 bsd.globalBSZmqStream = fastWriter.write(parsedFromString["address"]).c_str();
00712                                                 dataHeaderReceived=false;
00713                                         }
00714                                         else if (fastWriter.write(parsedFromString["htype"]).find("bsr_stop-1.0") != std::string::npos) {
00715                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00716                                                 std::cout << "STOP CONTROL MESSAGE " << std::endl;
00717                                                 std::cout << "RECEIVED: " <<  fastWriter.write(parsedFromString["htype"])  << std::endl;
00718                                                 std::cout << "From BS Documentation: Message can be ignored as source will send from same address after startup" << std::endl;
00719                                                 dataHeaderReceived=false;
00720                                         }
00721                                         else {
00722                                                 //Maybe this is not a header!??
00723                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00724                                                 std::cout << "HEADER IS UNRECOGNIZED: SOMETHING OTHER THAN DOCUMENETED! " << std::endl;
00725                                                 std::cout << parsedFromString["htype"] << std::endl;
00726                                                 std::cout << "EXIT PROGRAM " << std::endl;
00727                                                 dataHeaderReceived=false;
00728                                                 exit(1);
00729                                         }
00730                                         mainHeaderReceived=true;
00731                                 } //parsing successful
00732                                 else {
00733                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00734                                         std::cout << "PARSING FAILED !! " << std::endl;
00735                                         std::cout << "CANNOT PROCEED WITHOUT MAIN HEADER INFORMATION! " << std::endl;
00736                                         bsd.overallStatus=ECAFE_BSREAD_PARSEFAIL_MAINHEADER;
00737                                         
00738                                         //Close message and break from loop
00739                                         ++subMessage;
00740                                                 
00741                                         std::cout << "SUBMESSAGE " << subMessage << std::endl;
00742                                         more=0;           //  Multipart detection
00743                                         zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00744                                         zmq_msg_close (&message);
00745                                         break;
00746                                 }
00747                         } //ifMainHeaderReceived
00748                         
00749                         
00750                         else if ( (mainHeaderReceived && !dataHeaderReceived) || (newHash && dataHeaderReceived)) {
00751                                 //std::cout << "DATA HEADER ================================================================================= " << std::endl;
00752                                 //std::cout << "mh = " << mainHeaderReceived << " dh " << dataHeaderReceived << " nh " << newHash  <<std::endl;
00753                                 newHash=false;
00754                                 //Is it compressed or not??
00755                                 if (bsd.getDH_compression().compare("bitshuffle_lz4")==0) {
00756                                         /*
00757                                         int uncompressedSize=0; int blockSize=0;
00758                                         unfoldPreBlob(data, uncompressedSize, blockSize);
00759                                         //decompress
00760                                         int src_size=uncompressedSize/sizeof(char);
00761                                         //use bitshuffle terminology
00762                                         char* regen_buffer = (char *) malloc(src_size*sizeof(char));
00763                                         if (regen_buffer == NULL) {
00764                                                 bsd.overallStatus=ECAFE_BITSHUFF_ALLOCMEM;
00765                                                 return;
00766                                         }
00767                                         
00768                                         int sizeBlob = size-BSREAD_PREBLOB_BYTES;
00769                                         if (sizeBlob <1) {sizeBlob=size;} //Should not happen
00770                                         char * dataBlob = new char[sizeBlob];
00771                                         int ij=0; 
00772                                         for (int char_nbr = BSREAD_PREBLOB_BYTES; char_nbr < size; char_nbr++) {
00773                                                 dataBlob[ij]=data[char_nbr];
00774                                                 ++ij;
00775                                         }
00776                                         //ECAFE_BITSHUFF_DECOMPRESS
00777                                         //ECAFE_BITSHUFF_REALLOCMEM
00778                                         const int decompressed_size = bshuf_decompress_lz4((const char *) dataBlob, regen_buffer, uncompressedSize, sizeof(char), 0);
00779                                         delete [] dataBlob;
00780                                         if(localDebug) std::cout << " " << " decompressed_size " << decompressed_size << std::endl;
00781                                         if (decompressed_size < 0) {bsd.overallStatus=ECAFE_BITSHUFF_DECOMPRESS; return;}
00782                                         else if (decompressed_size == 0){    
00783                                                 if(localDebug)std::cout << "0 decompressed size! Should have a positive number for successful deompression?" <<std::endl;
00784                                         }
00785                                         else {    
00786                                                 if(localDebug)std::cout << "We successfully decompressed some data!" <<std::endl;
00787                                         }
00788                                         // Not only does a positive return value mean success,
00789                                         // value returned == number of bytes regenerated from compressed_data stream.
00790                                         // We can use this to realloc() *regen_buffer to free up memory 
00791                                         //Do this to remove spurious trailing characters
00792                                         regen_buffer = (char *)realloc(regen_buffer, decompressed_size);
00793                                         if (regen_buffer == NULL) {
00794                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00795                                                 std::cout << "WARNING with status code: " << ECAFE_BITSHUFF_REALLOCMEM << std::endl;
00796                                                 std::cout << "Failed to reallocate memory for compressed data. Not a show stopper" << std::endl;
00797                                         }
00798                                         */
00799                                         char * regen_buffer = NULL;
00800                                         int status=CAFE_BSHELPER::bitshuffleDecompress(data, regen_buffer, size, sizeof(char));
00801                                         if (status==ICAFE_NORMAL) { 
00802                                                 parsingSuccessful=reader.parse((const char *) regen_buffer, parsedFromString);
00803                                         } else 
00804                                         { 
00805                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00806                                                 std::cout << "Data Header Compression Algorithmn: " << bsd.getDH_compression() << " FAILED! " << std:: endl;
00807                                                 std::cout << "With Status Code: " << status << std:: endl;
00808                                                 std::cout << "Discarding remainder of multi-part message! " << status << std:: endl;
00809                                                 bsd.overallStatus=status;
00810                                                 free(regen_buffer);
00811                                                 
00812                                                 //Close message and break from loop
00813                                                 ++subMessage;
00814                                                 
00815                                                 std::cout << "SUBMESSAGE " << subMessage << std::endl;
00816                                                 more=0;           //  Multipart detection
00817                                                 zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00818                                                 zmq_msg_close (&message);
00819                                                 break;
00820                                         }
00821                                         
00822                                 } //bitshuffle_lz4
00823                                 else if (bsd.getDH_compression()=="none") {
00824                                         parsingSuccessful=reader.parse(data, parsedFromString);
00825                                 }
00826                                 else {
00827                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00828                                         std::cout << "Data Header Compression Algorithmn: " << bsd.getDH_compression() << " not recognized " << std:: endl;
00829                                         std::cout << "Assuming no compression for data header " << std::endl;
00830                                         parsingSuccessful=reader.parse(data, parsedFromString);
00831                                 }
00832                                 
00833                                 if (parsingSuccessful) {
00834                                 
00835                                         if (localDebug==1) {
00836                                                 Json::StyledWriter styledWriter;
00837                                                 std::cout << "STYLED: --------------------------------" << std::endl;
00838                                                 std::cout << styledWriter.write(parsedFromString) << std::endl;
00839                                                 std::cout << "----------------------------------" << std::endl;
00840                                                 std::cout << "HEADER TYPE" << std::endl;
00841                                                 std::cout << parsedFromString["htype"].asString() << std::endl;
00842                                         }
00843                                         
00844                                         if (fastWriter.write(parsedFromString["htype"]).find("bsr_d-1.1") != std::string::npos) { 
00845                                         
00846                                                 if (localDebug) std::cout << "chan " << parsedFromString["channels"] << std::endl;
00847                                                 
00848                                                 bsrdV.clear();
00849                                                 bsrdV.reserve(parsedFromString["channels"].size());
00850                                                 
00851                                                 for (Json::Value::ArrayIndex i=0; i < parsedFromString["channels"].size(); ++ i) {
00852                                                         
00853                                                         //New line is spit out after parsedFromString[]
00854                                                         //std::cout << "name:" << parsedFromString["channels"][i]["name"].asString()     << " //"<< std::endl;
00855                                                         //std::cout << "enco:" << parsedFromString["channels"][i]["encoding"].asString() << " //"<< std::endl;
00856                                                         //std::cout << "type:" << parsedFromString["channels"][i]["type"].asString()     << " //" <<std::endl;
00857                                                         //std::cout << "shape isArray " << parsedFromString["channels"][i]["shape"].isArray() << std::endl;
00858                                                         Json::Value iv;
00859                                                         Json::Value::ArrayIndex inpv=0;
00860                                                         
00861                                                         //std::cout << "Value as int " << parsedFromString["channels"][i]["shape"][inpv].asDouble() << std::endl;
00862                                                         //std::cout << "SIZE " << parsedFromString["channels"][i]["shape"].size() << std::endl;
00863                                                         //std::cout << "COMP " << parsedFromString["channels"][i]["compression"].asString() << std::endl;
00864                                                         std::vector<int> lshape;
00865                                                         for (Json::Value::ArrayIndex il=0; il < parsedFromString["channels"][i]["shape"].size(); ++ il) {   
00866                                                                 //std::cout << "Value as int// " << parsedFromString["channels"][i]["shape"][il].asInt() << std::endl;
00867                                                                 lshape.push_back(std::max(1, parsedFromString["channels"][i]["shape"][il].asInt()) ); 
00868                                                         }
00869                                                         
00870                                                         std::string NAME = parsedFromString["channels"][i]["name"].asString();
00871                                                         std::string ENCO = parsedFromString["channels"][i]["encoding"].asString(); // big default=little  
00872                                                         if (ENCO.empty()) {ENCO="little"; } //default
00873                                                         std::string TYPE = parsedFromString["channels"][i]["type"].asString();
00874                                                         if (TYPE.empty()) {TYPE="float64"; } //default
00875                                                         std::string COMP = parsedFromString["channels"][i]["compression"].asString();
00876                                                         if (COMP.empty()) {COMP="none"; } //default
00877                                                         
00878                                                         //std::cout << "TYPE " << TYPE << std::endl;
00879                                                         //std::cout << "TYPE: SIZE= " << TYPE.size() << std::endl; 
00880                                                         //std::cout << TYPE.substr(0,TYPE.size()) << std::endl;
00881                                                         
00882                                                         BSChannel bsc(NAME);
00883                                                         
00884                                                         //Need to find correct entry
00885                                                         //int idx= bsd.getIdxFromName(NAME);                                                    
00886                                                         //BSChannel bsc=  bsd.getBSChannel(idx); // instead of i
00887                                                         
00888                                                         bsc.setShape(lshape);
00889                                                         
00890                                                         //Need to fill in Channel Name, encoding and Type...
00891                                                         bsc.setType(TYPE);
00892                                                         bsc.setEncoding(ENCO);
00893                                                         bsc.setCompression(COMP);
00894                                                         
00895                                                         bsrdV.push_back(bsc);
00896                                                         
00897                                                         if ( bsd.getPVIdx(NAME) < 0 ) {
00898                                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00899                                                                 std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;                                                  
00900                                                                 std::cout << " pv requested = " << NAME << std::endl;
00901                                                                 std::cout << " Illegal index Value =" << bsd.getPVIdx(NAME) << std::endl;
00902                                                                 std::cout << " NO FURTHER ACTION REQUIRES " << std::endl;                                                               
00903                                                         }
00904                                                         else {
00905                                                                 //Ensure there is sufficent space to hold returned data,, e.g. may be an array
00906                                                                 bsd.pvd[ bsd.getPVIdx(NAME)].setNelem(bsc.getNelem());
00907                                                         }
00908                                                         int idx= bsd.getIdxFromName(NAME);
00909                                                         bsd.setBSChannel(idx, bsc);
00910                                                         
00911                                                         //std::cout << "The No elements are " << bsd.getBSChannel(i).getNelem() << std::endl;                                                   
00912                                                         //There may be more channels coming through this stream than requested
00913                                                         //therefore first fill a local vector<BSChannel>
00914                                                         //and then map this to that in BSDataHolder!
00915                                                         //It is conceivable that they do not match exactly
00916                                                 
00917                                                 } // for JsonValue
00918                                                 
00919                                                 if (localDebug) {
00920                                                         for (int i=0; i< bsrdV.size(); ++i) {
00921                                                                 std::cout << i << " " <<  bsrdV[i].getName() << " " << bsd.getPVIdx( bsrdV[i].getName()) << std::endl;
00922                                                         }
00923                                                         std::cout << "No of channels " << bsrdV.size() << " " << parsedFromString["channels"].size() << std::endl;
00924                                                         std::cout << "----------------------------------" << std::endl;
00925                                                 }
00926                                         } //bsr_d
00927                                   dataHeaderReceived=true; //set FLAG   
00928                                 } //parsingsuccessful
00929                                 else {
00930                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00931                                         std::cout << "PARSING FAILED !! " << std::endl;
00932                                         std::cout << "CANNOT PROCEED WITHOUT DATA HEADER INFORMATION! " << std::endl;
00933                                         bsd.overallStatus=ECAFE_BSREAD_PARSEFAIL_DATAHEADER;
00934                                         
00935                                         //Close message and break from loop
00936                                         ++subMessage;
00937                                                 
00938                                         std::cout << "SUBMESSAGE " << subMessage << std::endl;
00939                                         more=0;           //  Multipart detection
00940                                         zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
00941                                         zmq_msg_close (&message);
00942                                         break;
00943                                 }
00944                         
00945                         } //dataHeader
00946                         
00947                         else if (subMessage==1) {
00948                                 //header again so we skip this as hash is the same!
00949                                 //Do nothing
00950                                 if(localDebug) std::cout << "HEADER INFO ALREADY REGISTERED " << std::endl;
00951                         }
00952                         else if (subMessage%2 ==1) 
00953                         {
00954                                 //std::cout << "timestamp " << std::endl;
00955                                 
00956                                 if (bsd.getPVIdx(bsrdV[bsPVIdx].getName()) <0)  {
00957                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00958                                         std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
00959                                         std::cout << " bsPV index = " <<  bsPVIdx << std::endl;
00960                                         std::cout << " pv from bs = " << bsrdV[bsPVIdx].getName() << std::endl;
00961                                         std::cout << " Expect illegal BSDataHolder index value =" << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
00962                                         std::cout << " SKIPPING THIS SUBMESSAGE... " << subMessage << std::endl;
00963                                 }
00964                                 else {
00965                                         ts=0; tnsec=0;
00966                                         //long test=1518777482;
00967                                         //printBits(sizeof(test), &test);
00968                                   //printBits(size, &data);
00969                                         //size==16
00970                                         /*
00971                                         std::cout << "SIZE OF TS " << size << std::endl;                                
00972                                   for (size_t n=size/2; n < size; n++) {
00973                                           char jh = data[n];            
00974                                                 printBits(sizeof(jh), &jh);
00975                                         }
00976                                         std::cout << "SIZE OF TS " << size << std::endl;
00977                                         */
00978                                         /*(=
00979                                         char * data2 =new char[size/2];
00980                                         for (size_t n=0; n < size/2; n++) {
00981                                                 data2[n]=data[n];
00982                                         }
00983                                         std::cout << "LLLONG " << bsetLongLong.unfoldScalar(data2, "big") << std::endl;
00984                                         delete [] data2;
00985                                         */
00986                                         for (size_t n=0; n < size/2; n++) {
00987                                           char jh = data[n];            
00988                                                 //printBits(sizeof(jh), &jh);
00989                                                 if ( (n+1) < size/2 ) {
00990                                                 if ( data[n+1] & 0x80) {
00991                                                         data[n]=data[n]+1;
00992                                                 }
00993                                                 }
00994                                                 //if (n==4) data[n]=data[n]+1; //Just what is the compiler playing at when shifting bits!??
00995                                                 //if (n==5) data[n]=data[n]+1; //ditto
00996                                                 ts = (ts<<8) + data[n];                                                                         
00997                                         }
00998                                         
00999                                         //printBits(sizeof(ts), &ts);
01000                                         //if(localDebug)std::cout << "ts original way " << ts  << std::endl;
01001                                   //unsigned short idx=0;
01002                                         //int ts_int=( ( (data[idx+7]<<0) & 0x7f) | ((data[idx+7]<<0) & 0x80)  |  (data[idx+6]<<8) | (data[idx+5]<<16) |  (data[idx+4]<<24) 
01003                                         //int ts_int=(  (data[idx+7]<<0)   |  (data[idx+6]<<8) | (data[idx+5]<<16) |  (data[idx+4]<<24) 
01004                                         //      | ((long long) data[idx+3]<<32) | ((long long) data[idx+2]<<40) | ((long long) data[idx+1]<<48) | ((long long) data[idx]<<56) ) ;
01005                                                 
01006                                         //std::cout << "ts_int= " << ts_int << std:: endl;
01007                                         //idx=4;
01008                                         //long long ts_long=( ( (data[idx+3]<<0) ) | ((data[idx+3]<<0) & 0x80)  |  ( (data[idx+2]<<8) ) | ((data[idx+2]<<8) & 0x80)  |  
01009                                         //                                                                              ( (data[idx+1]<<16) ) | ((data[idx+1]<<16) & 0x80)  |  ( (data[idx]<<24) ) | ((data[idx]<<24) & 0x80) );  
01010                         
01011                                                 
01012                                         //std::cout << "ts_long= " << ts_long << std:: endl;
01013                                         
01014                                         
01015                                         
01016                                   for (size_t n=size/2; n < size; n++) {
01017                                           char jh = data[n];            
01018                                                 //printBits(sizeof(jh), &jh);
01019                                                 //if (n==12) data[n]=data[n]+1;
01020                                                 if ( (n+1) < size ) {
01021                                                 if ( data[n+1] & 0x80 ) {
01022                                                         data[n]=data[n]+1;
01023                                                         //std::cout << "------------" << std::endl;
01024                                                         //jh = data[n]; 
01025                                                         //printBits(sizeof(jh), &jh);
01026                                                         //std::cout << "------------" << std::endl;
01027                                                 }
01028                                                 }
01029                                                 tnsec = (tnsec<<8) + data[n];
01030                                         }
01031                                           //printBits(sizeof(tnsec), &tnsec);
01032                                         //if(localDebug)std::cout << "tns original way " << tnsec  << std::endl;
01033                                         
01034                                         //idx=8;
01035                                         //tnsec=( ( (data[idx+7]<<0) & 0x7f) | ((data[idx+7]<<0) & 0x80)  |  (data[idx+6]<<8) | (data[idx+5]<<16) |  (data[idx+4]<<24) 
01036                                         //      | ((long long) data[idx+3]<<32) | ((long long) data[idx+2]<<40) | ((long long) data[idx+1]<<48) | ((long long) data[idx]<<56) ) ;
01037                                         
01038                                         
01039                                         if(localDebug)std::cout << "ts " << ts << " tnsec " << tnsec << std::endl;
01040                                         //Add to bsd
01041                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.secPastEpoch=ts;
01042                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.nsec        =tnsec;
01043                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setBeamEventNo(bsd.getPulse_id());
01044                                 }
01045                         }
01046                         else {
01047                                 //std::cout << " NOW COMES THE DATA " << std::endl;
01048                                 
01049                                 //if (localDebug) {
01050                                 //      for (int i=0; i< bsrdV.size(); ++i) {
01051                                 //              std::cout << i << " " <<  bsrdV[i].getName() << " " << bsd.getPVIdx(bsrdV[i].getName()) << std::endl;
01052                                 //      }
01053                                 //      std::cout << "No of channels " << bsrdV.size()  << std::endl;
01054                                 //      std::cout << "-----------------------" << std::endl;
01055                                 //}
01056                                 
01057                                 //Now comes the data
01058                                 ++bsPVIdx;
01059                                 
01060                                 if (bsd.getPVIdx(bsrdV[bsPVIdx].getName()) <0)  {
01061                                         
01062                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
01063                                         std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
01064                                         std::cout << " bsPV index = " <<  bsPVIdx << std::endl;
01065                                         std::cout << " pv from bs = " << bsrdV[bsPVIdx].getName() << std::endl;
01066                                         std::cout << " Expect illegal BSDataHolder index value =" << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
01067                                         std::cout << " SKIPPING THIS SUBMESSAGE... " << subMessage << std::endl;
01068                                         
01069                                         more=0;           //  Multipart detection
01070                                         zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
01071                                         zmq_msg_close (&message);
01072                                         ++subMessage;
01073                                         if (!more) {
01074                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
01075                                                 std::cout << " No more submessages! " << std::endl;
01076                                                 std::cout << " This message implies that pv was not accompanied by a timestamp!! Something not quite right. " << std::endl;  
01077                                                 break;      //  Last message part
01078                                         }
01079                                         continue; //top of while
01080                                 }
01081                                 
01082                                 if(localDebug) {
01083                                         std::cout << "index= " << bsPVIdx <<  std::endl;
01084                                         std::cout << "Name = " << bsrdV[bsPVIdx].getName() <<  " " << bsrdV[bsPVIdx].getType() << std::endl;
01085                                         std::cout << "IDx  = " << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
01086                                 }
01087                                 
01088                                 //Find enum of data type
01089                                 it_name = name_index.find(bsrdV[bsPVIdx].getType());
01090                                 if ( (it_name) != name_index.end() ) {
01091                                         bsdtIndex=(*it_name).by_bsID;
01092                                 }
01093                                 //Do we need to compress the data?
01094                                 bool compressedFlag=false;
01095                                 BSChannel bsc=bsd.getBSChannel(bsrdV[bsPVIdx].getName());
01096                                 char * regen_buffer;
01097                                 
01098                                 if ( bsc.getCompression().compare("bitshuffle_lz4")==0)  {
01099                                 
01100                                         //Decompress
01101                                         
01102                                         int status=CAFE_BSHELPER::bitshuffleDecompress(data, regen_buffer, size, getByteSize(bsdtIndex));
01103                                         if (status==ICAFE_NORMAL) { 
01104                                                 parsingSuccessful=reader.parse((const char *) regen_buffer, parsedFromString);                                  
01105                                         }       else 
01106                                         { 
01107                                                 free(regen_buffer);
01108                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(status);
01109                                                 bsd.overallStatus=status;
01110                                                 //skip message
01111                                                 more=0;           //  Multipart detection
01112                                                 zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
01113                                                 zmq_msg_close (&message);
01114                                                 ++subMessage;
01115                                                 if (!more) {
01116                                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
01117                                                         std::cout << " No more submessages! " << std::endl;
01118                                                         std::cout << " This message implies that pv was not accompanied by a timestamp!! Something not quite right. " << std::endl;  
01119                                                         break;      //  Last message part
01120                                                 }
01121                                                 continue; //top of while
01122                                         }
01123                                         compressedFlag=true;
01124                                         
01125                                 }
01126                                 else {
01127                                         //No compression required
01128                                         regen_buffer = data; //new char[size];
01129                                         //regen_buffer = new char[size];
01130                                         //for (int i=0; i<size; ++i) {
01131                                                 //regen_buffer[i]=data[i];
01132                                         //}
01133                                         //std::cout << size << " /// " << bsrdV[bsPVIdx].getNelem()<< std::endl;
01134                                 }
01135                                 
01136                                 //std::cout << "Compression//0//--> " << bsd.getBSChannel(0).getCompression() << std::endl;
01137                                 //std::cout << "Compression//1//--> " << bsd.getBSChannel(1).getCompression() << std::endl;
01138                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ICAFE_NORMAL);
01139                                 
01140                                 //char,bool=1, short=2, int =4, long,long long=8
01141                                 
01142                                 switch (bsdtIndex) {
01143                                         case CAFE_BSHELPER::BS_STRING:
01144                                                 {
01145                                                 if (bsrdV[bsPVIdx].getNelem()>1) { 
01146                                                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
01147                                                         std::cout << "An array of strings has not been envisioned for bsREAD" << std::endl;
01148                                                         std::cout << "Assuming one string element of max size PVNAME_SIZE" << std::endl;
01149                                                 }
01150                                                 if (size > MAX_STRING_SIZE) {
01151                                                         std::cout << "Datatype is string. Size " << size << " too large; trimming to MAX_STRING_SIZE" << std::endl;
01152                                                 }
01153                                                 
01154                                                 std::string str="";;
01155                                                 for (int i=0; i<std::min((int)size, (int)MAX_STRING_SIZE); ++i) {
01156                                                         if (data[i] != '\0') {
01157                                                                 str.append(1, data[i] );
01158                                                                 //std::cout << str << " [" << i << "] " << std::endl;
01159                                                         }
01160                                                 }       
01161                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(str);
01162                                                 if(localDebug)std::cout << " readback of string value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].str << std::endl;
01163                                                 break;
01164                                                 }
01165                                         case CAFE_BSHELPER::BS_FLOAT32:
01166                                                 {
01167                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01168                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetFloat.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01169                                                         if(localDebug)std::cout << " readback of float scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].f << std::endl;
01170                                                         break;
01171                                                 }                                                       
01172                                                 //std::vector<float> Vbs( bsrdV[bsPVIdx].getNelem()); //Must allocate memory here
01173                                                 //bsetFloatV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);                 
01174                                                 //for (int ik=0; ik<Vbs.size(); ++ik) std::cout << Vbs[ik] << " [" << ik << "] " << std::endl;
01175                                                 float * ff = bsetFloat.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
01176                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ff); 
01177                                                 if(localDebug)std::cout << " readback of float value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].f << std::endl;
01178                                                 delete [] ff;
01179                                                 break;
01180                                                 }
01181                                         case CAFE_BSHELPER::BS_FLOAT64:
01182                                                 {
01183                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01184                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetDouble.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01185                                                         if(localDebug)std::cout << " readback of double scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].d << std::endl;
01186                                                         break;
01187                                                 }
01188                                                 //std::vector<double> Vbs( bsrdV[bsPVIdx].getNelem()); //Must allocate memory here
01189                                                 //bsetDoubleV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01190                                                 double * dd =  bsetDouble.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
01191                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(dd); 
01192                                                 if(localDebug)std::cout << " readback of double value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].d << std::endl;
01193                                                 delete [] dd;
01194                                                 break;
01195                                                 }
01196                                         case CAFE_BSHELPER::BS_INT64:
01197                                                 {
01198                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01199                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetLongLong.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01200                                                         if(localDebug)std::cout << " readback of uint64 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong()  << std::endl;
01201                                                         break;
01202                                                 }
01203                                                 //std::vector<long long> Vbs; 
01204                                                 //bsetLongLongV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01205                                                 long long * ll=bsetLongLong.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());                                             
01206                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ll); 
01207                                                 if(localDebug) {
01208                                                         std::cout << " readback of int64 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong() << std::endl;
01209                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01210                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong(i) << " [" << i << "] " ;
01211                                                         //}
01212                                                         //std::cout << std::endl;
01213                                                 }
01214                                                 delete [] ll;
01215                                                 break;
01216                                                 }
01217                                         case CAFE_BSHELPER::BS_UINT64: 
01218                                                 {
01219                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01220                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetULongLong.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01221                                                         if(localDebug)std::cout << " readback of uint64 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong()  << std::endl;
01222                                                         break;
01223                                                 }
01224                                                 //std::vector<unsigned long long> Vbs; 
01225                                                 //bsetULongLongV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01226                                                 unsigned long long * ull=bsetULongLong.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());                                          
01227                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ull); 
01228                                                 if(localDebug) {
01229                                                         std::cout << " readback of uint64 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong() << std::endl;
01230                                                         
01231                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01232                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong() << " [" << i << "] " ;
01233                                                         //}
01234                                                         std::cout << std::endl;
01235                                                 }
01236                                                 delete [] ull;
01237                                                 break;
01238                                                 }
01239                                         case CAFE_BSHELPER::BS_INT32:
01240                                                 {
01241                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01242                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetInt.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01243                                                         if(localDebug)std::cout << " readback of int32 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].l  << std::endl;
01244                                                         break;
01245                                                 }
01246                                                 //std::vector<int> Vbs; 
01247                                                 //bsetIntV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01248                                                 int * l=bsetInt.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());                                         
01249                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(l); 
01250                                                 if(localDebug) {
01251                                                         std::cout << " readback of int32 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].l << std::endl;
01252                                                         for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01253                                                                 std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].l << " [" << i << "] " ;
01254                                                         }
01255                                                         std::cout << std::endl;
01256                                                 }
01257                                                 delete [] l;
01258                                                 break;
01259                                                 }
01260                                         case CAFE_BSHELPER::BS_UINT32:
01261                                                 {
01262                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01263                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUInt.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01264                                                         if(localDebug)std::cout << " readback of uint32 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong()  << std::endl;
01265                                                         break;
01266                                                 }
01267                                                 //std::vector<unsigned int> Vbs; 
01268                                                 //bsetUIntV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01269                                                 unsigned int * ul=bsetUInt.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());                                              
01270                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ul); 
01271                                                 if(localDebug) {
01272                                                         std::cout << " readback of uint32 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong() << std::endl;
01273                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01274                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong(i) << " [" << i << "] " ;
01275                                                         //}
01276                                                         //std::cout << std::endl;
01277                                                 }
01278                                                 delete [] ul;
01279                                                 break;
01280                                                 }
01281                                         case CAFE_BSHELPER::BS_INT16:
01282                                                 {
01283                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01284                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetShort.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01285                                                         if(localDebug)std::cout << " readback of short scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].s << std::endl;
01286                                                         break;
01287                                                 }
01288                                                 //std::vector<short> Vbs; 
01289                                                 //bsetShortV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01290                                                 
01291                                                 short * s=bsetShort.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
01292                                                 //std::cout <<  "SHORT VALUE " << s[0] << std::endl;
01293                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(s); 
01294                                                 if(localDebug) {
01295                                                         std::cout << " readback of short value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].s << std::endl;
01296                                                         //std::cout << " nelements in pvdata " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getNelem() << std::endl;
01297                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01298                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].s << " [" << i << "] " ;
01299                                                         //}
01300                                                         //std::cout << std::endl;
01301                                                 }       
01302                                                 delete [] s;
01303                                                 break;
01304                                                 }
01305                                         case CAFE_BSHELPER::BS_UINT16:
01306                                                 {
01307                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01308                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUShort.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01309                                                         if(localDebug)std::cout << " readback of ushort scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].us << std::endl;
01310                                                         break;
01311                                                 }
01312                                                 //std::vector<unsigned short> Vbs; 
01313                                                 //bsetUShortV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01314                                                 unsigned short * us=bsetUShort.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
01315                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(us); 
01316                                                 if(localDebug) {
01317                                                         std::cout << " readback of ushort value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].us << std::endl;
01318                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01319                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].us << " [" << i << "] " ;
01320                                                         //}
01321                                                         //std::cout << std::endl;
01322                                                 }       
01323                                                 delete [] us;
01324                                                 break;
01325                                                 }
01326                                         case CAFE_BSHELPER::BS_INT8:
01327                                                 {
01328                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01329                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetChar.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01330                                                         if(localDebug)std::cout << " readback of char scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
01331                                                         break;
01332                                                 }
01333                                                 //std::vector<char> Vbs; 
01334                                                 //bsetCharV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01335                                                 char * ch=bsetChar.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());                                              
01336                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ch); //set(ch); 
01337                                                 if(localDebug) {
01338                                                         std::cout << " readback of char value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
01339                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01340                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].ch << " [" << i << "] " ;
01341                                                         //}
01342                                                         //std::cout << std::endl;
01343                                                 }
01344                                                 delete [] ch;
01345                                                 break;
01346                                                 }
01347                                         case CAFE_BSHELPER::BS_UINT8:
01348                                         case CAFE_BSHELPER::BS_BOOL:
01349                                                 {
01350                                                 if ( bsrdV[bsPVIdx].getNelem()==1) {
01351                                                         bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUChar.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
01352                                                         if(localDebug)std::cout << " readback of unsigned char scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
01353                                                         break;
01354                                                 }
01355                                                 //std::vector<unsigned char> Vbs; 
01356                                                 //bsetUCharV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
01357                                                 unsigned char * uch=bsetUChar.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());   
01358                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(uch); 
01359                                                 if(localDebug) {
01360                                                         std::cout << " readback of unsigned char value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
01361                                                         //for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
01362                                                         //      std::cout <<  bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].ch << " [" << i << "] " ;
01363                                                         //}
01364                                                         //std::cout << std::endl;
01365                                                 }       
01366                                                 delete [] uch;
01367                                                 break;
01368                                                 }
01369                                         default:
01370                                                 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
01371                                                 std::cout << "ECAFE_INVALID_SWITCH_CASE" << std::endl;
01372                                                 print_out_by<by_bsID>(bsdt);
01373                                                 bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ECAFE_INVALID_SWITCH_CASE);
01374                                 }
01375                                 if (compressedFlag) {free(regen_buffer);}
01376                         }
01377                 } //top else
01378                 
01379                 
01380                 // THIS HAS TO BE COPIED TO BEFORE CONTINUE!
01381                 
01382                 //Continue goes to top of while
01383                 //Why not add submessage here
01384                 ++subMessage;
01385                 
01386                 //std::cout << "SUBMESSAGE " << subMessage << std::endl;
01387                 more=0;           //  Multipart detection
01388                 zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
01389                 zmq_msg_close (&message);
01390                 if (!more) {
01391                         if(localDebug)puts ("//END----------------------------------------//");
01392                         break;      //  Last message part
01393                 }
01394         
01395         } //while 1
01396         
01397 
01398 //First two messages are main header and data header
01399         if (subMessage>1) {
01400                 bsd.overallStatus=ICAFE_NORMAL;
01401                 bsd.setNChannels((subMessage-2)/2);
01402                 bsd.setNNullData(nZeroSize/2);
01403                 bsd.setPGoodData((subMessage-nZeroSize-2)*100/(std::max((subMessage-2),1)) );
01404                 if(localDebug) {
01405                         std::cout << " No of submessages: " << subMessage << std::endl;
01406                         std::cout << " of which 0 size have: " << nZeroSize << std::endl;
01407                         std::cout << " No of channels: " << bsd.getNChannels()<< std::endl;
01408                         std::cout << " of which 0 size have: " << bsd.getNNullData() << std::endl;
01409                         std::cout << " Percentage good: " << bsd.getPGoodData() <<std::endl;
01410                         
01411                         puts ("//------------------------------------END-------------------------------------//");
01412                         
01414                 }
01415         }
01416         
01417         return;
01418         
01419                         
01420 #undef __METHOD__
01421 }
01422 
01423 #endif  //  ZBSREAD_H

Generated on 28 May 2018 for CAFE by  doxygen 1.6.1