zbsDataHolders.h

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 #ifndef ZBS_DATA_HOLDERS_H
00009 #define ZBS_DATA_HOLDERS_H
00010 
00011 #include <config.h>
00012 #include <vector>
00013 #include <PVDataHolder.h>
00014 #include <statusCodes.h>
00015 #include <handleHelper.h>
00016 #include <global.h>
00017 
00018 
00019 #if HAVE_ZEROMQ
00020         #include <zmq.h>
00021         #if HAVE_JSON
00022                 #include <json/json.h>
00023         #endif
00024         #if HAVE_CURL
00025                 #include <curl/curl.h>          
00026         #endif
00027 #endif
00028 
00029 //forward declaration reguired to establish friend
00030 class CAFE;
00031 
00032 namespace CAFEBS{
00033 
00035 //
00036 // BSChannel for configuration
00037 // BSDataHolder to keep data 
00038 //
00040 
00041 class BSChannel {
00042 friend class BSDataHolder;
00043 private:
00044         std::string name;
00045         std::string type;
00046         std::vector<int> shape;
00047         unsigned int nelem;
00048         std::string encoding;
00049         int offset;
00050         int modulo;
00051         std::string compression;
00052         bool BSEnabled;
00053         
00054         HandleHelper helper;
00055         
00056 public:
00057         //All data is placed to the PVDataHolder object
00058         //PVDataHolder pvd;
00059         //void setName(std::string _name)  {    name   =_name;}
00060         void setType(std::string _type) { type=_type;}
00061         
00062         void setOffset(int _offset)   {    offset =_offset;}
00063         void setModulo(int _modulo)   {    modulo =_modulo;}
00064         void setShape (std::vector<int> _shape)    { 
00065                 nelem=1;
00066                 shape.clear();
00067                 shape.reserve(_shape.size());   
00068                 shape.assign(_shape.begin(),_shape.end());  
00069                 for (size_t i=0; i<shape.size(); ++i) {
00070                         nelem=nelem * shape[i];
00071                 }
00072         }       
00073         
00074         void setEncoding(std::string _encoding) {encoding =_encoding;}
00075         void setCompression(std::string _compression) {compression = _compression;}
00076         void setBSEnabled(bool _bse) {BSEnabled=_bse;}
00077         
00078         std::string getName() {return name;}
00079         std::string getType() {return type;}
00080         int    getModulo() {return modulo;}
00081         int    getOffset() {return offset;}
00082         std::vector<int>  getShape() {return shape;}
00083         unsigned int getNelem() {return nelem;}
00084         std::string getEncoding()    {return encoding;}
00085         std::string getCompression() {return compression;}
00086         bool   isBSEnabled()    {return BSEnabled;}
00087         
00088         BSChannel(std::string _name):  offset(0), modulo(1) {
00089                 char pv[PVNAME_SIZE];
00090                 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00091                 name=(std::string) pv;
00092                 shape.clear();
00093                 shape.push_back(1);
00094                 nelem=1;
00095                 encoding=std::string("little");
00096                 compression=std::string("none");
00097                 type=std::string("float64");
00098                 BSEnabled=true;
00099         };
00100         BSChannel(std::string _name, int _modulo): offset(0)    {
00101                 char pv[PVNAME_SIZE];
00102                 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00103                 name=(std::string) pv; modulo=_modulo;
00104                 shape.clear();
00105                 shape.push_back(1);
00106                 nelem=1;
00107                 encoding=std::string("little");
00108                 compression=std::string("none");
00109                 type=std::string("float64");
00110                 BSEnabled=true;
00111         };
00112         BSChannel(std::string _name, int _modulo, int _offset) {
00113                 char pv[PVNAME_SIZE];
00114                 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00115                 name=(std::string) pv; modulo=_modulo; offset=_offset;
00116                 shape.clear();
00117                 shape.push_back(1);
00118                 nelem=1;
00119                 encoding=std::string("little");
00120                 compression=std::string("none");
00121                 type=std::string("float64");
00122                 BSEnabled=true;
00123         };
00124 };
00125 
00126 
00127 
00128 class BSDataHolder{
00129 friend class ::CAFE; //Says CAFE is a member og the global namespace
00130 private:
00131         std::string htype;
00132         unsigned long long pulse_id;
00133         etsNorm global_timestamp;
00134         std::string hash;
00135         std::string dh_compression;
00136         
00137         std::vector<BSChannel> bsChannel;
00138         HandleHelper helper;
00139         
00140         std::vector<std::string>   pv;
00141         std::vector<unsigned int>  handle;
00142         
00143         bool isBS;
00144         bool BSInitialized;
00145         bool resourceConnected; // Used to confirm initial connection;
00146         
00147         unsigned int nPV;
00148         unsigned int nBSEnabled;
00149         
00150         unsigned int nChannels;
00151         unsigned int nNullData; 
00152         float pGoodData; //nChannels-nNulldata/nChannels
00153         
00154         void *context;
00155         void *receiver;
00156         int rc;
00157         unsigned short nhwm;  // high-water mark
00158         int  timeoutMS;      // timeout in ms; -1 is wait for ever      
00159         
00160         #if HAVE_JSON
00161                 Json::Value parsedFromString;
00162                 Json::Reader reader;
00163         #endif
00164         
00165         bool parsingSuccessful;
00166 
00167 public:
00168         ~BSDataHolder(){
00169          //std::cout << "BSDATAHOLDER DECONSTRUCTOR CALLED " << std::endl;
00170         
00171         };
00172         
00173         //delete [] pvd; resourceConnected=false; nBSEnabled=0;};
00174         
00175         BSDataHolder():isBS(true),BSInitialized(false),nBSEnabled(0) 
00176         {
00177                 //std::cout << "BSDATAHOLDER CONSTRUCTOR CALLED /" << std::endl;
00178                 //pvd = NULL;
00179                 overallStatus=ICAFE_NORMAL;
00180                 resourceConnected=false;
00181                 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;      // high-water mark
00182                 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;      // timeout in ms; -1 is wait for ever         
00183                 hash="undefined";
00184                 //htype="";
00185                 dh_compression="none";
00186                 global_timestamp.secPastEpoch=0;
00187                 global_timestamp.nsec=0;
00188                 pulse_id=0;
00189                 initCalled=false;
00190                 nPV=0;
00191                 nChannels=0;
00192           nNullData=0;
00193           pGoodData=0.0;
00194         
00195         }
00196         
00197         BSDataHolder(std::vector<std::string> _pv, std::vector<unsigned int> _handle):isBS(false),BSInitialized(false),nBSEnabled(0)
00198         {
00199                 pv.clear(); handle.clear();
00200                 pv.assign    (_pv.begin(),    _pv.end());
00201                 
00202                 bsChannel.clear();
00203                 for (size_t i=0; i< _pv.size(); ++i) {
00204                         bsChannel.push_back(BSChannel(_pv[i]));
00205                 }
00206                 
00207                 handle.assign(_handle.begin(),_handle.end());
00208                 pvd = new PVDataHolder[handle.size()];
00209                 
00210                 //Set nelem to native
00211                 for (unsigned int i=0; i< handle.size(); ++i) {
00212                         pvd[i].setNelem(helper.getNelemNative(handle[i]));
00213                 }
00214                 
00215                 nPV=_pv.size();
00216                 overallStatus=ICAFE_NORMAL;
00217                 resourceConnected=false;
00218                 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;      // high-water mark
00219                 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;      // timeout in ms; -1 is wait for ever         
00220                 hash="undefined";
00221                 dh_compression="none";
00222                 global_timestamp.secPastEpoch=0;
00223                 global_timestamp.nsec=0;
00224                 pulse_id=0;
00225                 initCalled=false;
00226                 nChannels=0;
00227           nNullData=0;  
00228           pGoodData=0.0;
00229         }
00230         
00231         BSDataHolder(std::vector<std::string> _pv):isBS(true),BSInitialized(false),nBSEnabled(0)
00232         {
00233                 
00234                 pv.clear(); 
00235                 pv.assign(_pv.begin(),_pv.end());       
00236                 
00237                 handle.clear();
00238                 handle.reserve(_pv.size());
00239                 
00240                 bsChannel.clear();
00241                 for (size_t i=0; i< _pv.size(); ++i) {
00242                         bsChannel.push_back(BSChannel(_pv[i]));
00243                         handle.push_back(0); // initialize handle to zero
00244                 }
00245                 
00246                 pvd = new PVDataHolder[_pv.size()];
00247                 
00248                 //for (int i=0; i< handle.size(); ++i) {                
00249                 //      pvd[i].setNelem(1);             
00250                 //}
00251                 
00252                 nPV=_pv.size(); 
00253                 overallStatus=ICAFE_NORMAL;     
00254                 resourceConnected=false;        
00255                 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;      // high-water mark
00256                 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;      // timeout in ms; -1 is wait for ever
00257                 hash="undefined";
00258                 dh_compression="none";
00259                 global_timestamp.secPastEpoch=0;
00260                 global_timestamp.nsec=0;
00261                 pulse_id=0;
00262                 initCalled=false;
00263                 nChannels=0;
00264           nNullData=0;  
00265           pGoodData=0.0;
00266         }
00267 
00268         void init(std::vector<std::string> _pv) {
00269                 
00270                 if (!initCalled) {
00271                         pv.clear(); 
00272                         pv.assign(_pv.begin(),_pv.end());                       
00273                         handle.clear();
00274                         handle.reserve(_pv.size());
00275                 
00276                         bsChannel.clear();
00277                         for (size_t i=0; i< _pv.size(); ++i) {
00278                                 bsChannel.push_back(BSChannel(_pv[i]));
00279                                 handle.push_back(0); // initialize handle to zero
00280                         }
00281                         pvd = new PVDataHolder[_pv.size()];             
00282                         nPV=_pv.size(); 
00283                         initCalled=true;
00284                         nBSEnabled=0;
00285                         BSInitialized=false;
00286                         isBS=false;
00287                         
00288                 }
00289                 
00290         }
00291         
00292 
00293         bool initCalled;
00294         PVDataHolder * pvd;
00295 
00296         std::string globalBSZmqStream;
00297 
00298         unsigned int getHWM(){return nhwm;}
00299         int  getTimeout()    {return timeoutMS;}
00300         void setHWM(unsigned short _nhwm)   {nhwm=_nhwm;}       //Will only be set into action at next (re-)connect or before BSInitialized
00301         void setTimeout(int _timeoutMS) {timeoutMS=_timeoutMS;} //Will only be set into action at next (re-)connect or before BSInitialized
00302 
00303         unsigned int getNPV(){return nPV;};
00304         unsigned int getNBSEnabled(){return nBSEnabled;};
00305 
00306         BSChannel getBSChannel(unsigned int idx); 
00307         BSChannel getBSChannel(std::string _name); 
00308          
00309         void setBSChannel(unsigned int idx, BSChannel bsc);
00310         void setBSChannel(BSChannel bsc); //Name defined so will find correct index
00311         
00312         void setBSModulo(std::string pv, int modulo) {
00313                 BSChannel bsc=getBSChannel(pv);
00314                 bsc.setModulo(modulo);
00315                 setBSChannel(bsc);
00316         }
00317         void setBSOffset(std::string pv, int offset) {
00318                 BSChannel bsc=getBSChannel(pv);
00319                 bsc.setOffset(offset);
00320                 setBSChannel(bsc);
00321         }
00322         void setBSModuloOffset(std::string pv, int modulo, int offset) {
00323                 BSChannel bsc=getBSChannel(pv);
00324                 bsc.setModulo(modulo);
00325                 bsc.setOffset(offset);
00326                 setBSChannel(bsc);
00327         }
00328         
00329         
00330         
00331         int getIdxFromName(std::string _name);
00332         
00333         void verifyIndex(unsigned int  idx); 
00334               
00335         bool isIndexOutOfRange (unsigned int  idx) {
00336                 return (idx >= nPV) ? true:false;
00337         }; 
00338         void printHeader(); 
00339         int overallStatus;
00340         void *subscriber;
00341         int timeout;
00342         
00343         bool isResourceConnected(){return resourceConnected;};
00344         void setResourceConnected(bool _rc){resourceConnected=_rc; return;};
00345         
00346         int getPVIdx(std::string _pv) {
00347         #define __METHOD__  "getPVIdx(std::string _pv)"
00348                 std::vector<std::string>::iterator it;
00349                 it = find (pv.begin(), pv.end(), _pv);
00350                 
00351                 if (it != pv.end()) {
00352                         //std::cout << "Element found in pv vector: " << *it << '\n' << std::endl;
00353                         //std::cout << "Distance is: " << std::distance(pv.begin(),it) << '\n' << std::endl;
00354                         return std::distance(pv.begin(),it);
00355                 }
00356                 else {
00357                         std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00358                         std::cout << _pv << " element not found in pv vector\n" << std::endl;
00359                         return -1;
00360                 }
00361                 
00362                 /*
00363                 for (size_t i=0;  i< pv.size(); ++i) {   
00364                         if ( pv[i].compare(_pv) == 0) {                 
00365                                 return i;
00366                         }
00367                 }
00368                 */
00369         #undef __METHOD__
00370         }
00371         
00372         
00373         std::vector<std::string>   getPV() { return pv;}
00374         std::string getPV(unsigned int idx) throw(std::out_of_range){
00375                 if(isIndexOutOfRange(idx)) {
00376                         std::ostringstream oss;
00377                         oss << "Exception! Index " << idx
00378                                         << " to BSDataHolder method is out of range. Valid range is from 0 to " << nPV-1;
00379                         throw std::out_of_range(oss.str());
00380                 }
00381                 return pv[idx];
00382         }
00383         
00384         std::vector<unsigned int> getHandles(){return handle;}
00385         unsigned int getHandle(unsigned int idx) throw(std::out_of_range){
00386                 if(isIndexOutOfRange(idx)) {
00387                         std::ostringstream oss;
00388                         oss << "Exception! Index " << idx
00389                                         << " to BSDataHolder method is out of range. Valid range is from 0 to " << nPV-1;
00390                         throw std::out_of_range(oss.str());
00391                 }
00392         return handle[idx];
00393         }
00394 
00395         std::string getHtype() {return htype;}
00396         void setHtype(std::string _htype) {htype=_htype;}
00397  
00398         unsigned long long getPulse_id(){return pulse_id;}
00399         void setPulse_id(unsigned long long _pulse_id){pulse_id=_pulse_id; return;}
00400 
00401         etsNorm getGlobal_timestamp(){return global_timestamp;}
00402         
00403         void setGlobal_timestamp(unsigned int _sec, unsigned int _nsec){
00404                 global_timestamp.secPastEpoch=_sec;  
00405                 global_timestamp.nsec        =_nsec; 
00406                 return;         
00407         }
00408 
00409         std::string getHash() {return hash;}
00410         void setHash(std::string _hash) {hash=_hash;}
00411  
00412         std::string getDH_compression() {return  dh_compression;}
00413         void setDH_compression(std::string dhc) {dh_compression= dhc;}
00414 
00415   //Diagnostics
00416         void setNChannels(unsigned int _nc) {nChannels=_nc;}
00417         void setNNullData(unsigned int _nn) {nNullData=_nn;}
00418         void setPGoodData(float _pg) {pGoodData=_pg;}
00419         unsigned int getNChannels() {return nChannels;}
00420         unsigned int getNNullData() {return nNullData;}
00421         float getPGoodData() {return pGoodData;}
00422 
00423 
00424   std::vector<int> getStatusV() {
00425                 std::vector<int> V;
00426                 V.reserve(nPV);
00427                 for (size_t i=0; i<nPV; ++i){
00428                         V.push_back(pvd[i].getStatus());                        
00429                 }
00430                 return V;
00431         }
00432 
00433 
00434 
00435         std::vector<float> getAsFloatV() {
00436                 std::vector<float> V;
00437                 V.reserve(nPV);
00438 
00439                 for (size_t i=0; i<nPV; ++i){
00440                         V.push_back(pvd[i].getAsFloat());                       
00441                 }
00442                 return V;
00443         }
00444 
00445    
00446         int getAsFloatV(std::vector<float> &valueV, std::vector<int> &statusV) {
00447                 
00448                 valueV.clear();
00449                 statusV.clear();
00450                 valueV.reserve(nPV);
00451     statusV.reserve(nPV);
00452                         
00453                 for (size_t i=0; i<nPV; ++i){
00454                          valueV.push_back(pvd[i].getAsFloat());
00455                         statusV.push_back(pvd[i].getStatus());                  
00456                 }
00457                 return overallStatus;
00458         }
00459 
00460 
00461         std::vector<double> getAsDoubleV() {
00462                 std::vector<double> V;
00463                 V.reserve(nPV);
00464                 
00465                 //V.reserve(bsChannel.size());
00466                 //for (size_t i=0; i<bsChannel.size(); ++i){
00467                 for (size_t i=0; i<nPV; ++i){
00468                         //V.push_back(bsChannel[i].pvd.getAsDouble());
00469                         V.push_back(pvd[i].getAsDouble());                      
00470                 }
00471                 return V;
00472         }
00473 
00474    
00475         int getAsDoubleV(std::vector<double> &valueV, std::vector<int> &statusV) {
00476                 
00477                 valueV.clear();
00478                 statusV.clear();
00479                 valueV.reserve(nPV);
00480     statusV.reserve(nPV);
00481                         
00482                 for (size_t i=0; i<nPV; ++i){
00483                          valueV.push_back(pvd[i].getAsDouble());
00484                         statusV.push_back(pvd[i].getStatus());                  
00485                 }
00486                 return overallStatus;
00487         }
00488 
00489   
00490         std::vector<std::string> getAsStringV() {
00491                 std::vector<std::string> V;
00492                 V.reserve(nPV);
00493                 for (size_t i=0; i<nPV; ++i){           
00494                         V.push_back(pvd[i].getAsString());
00495                 }
00496                 return V;
00497         }
00498         
00499         int getAsStringV(std::vector<std::string> &valueV, std::vector<int> &statusV) {
00500                 
00501                 valueV.clear();
00502                 statusV.clear();
00503                 valueV.reserve(nPV);
00504     statusV.reserve(nPV);
00505                         
00506                 for (size_t i=0; i<nPV; ++i){
00507                          valueV.push_back(pvd[i].getAsString());
00508                         statusV.push_back(pvd[i].getStatus());                  
00509                 }
00510                 return overallStatus;
00511         }
00512 
00513         
00514 
00515         std::vector<int> getAsIntV() {
00516                 std::vector<int> V;
00517                 V.reserve(nPV);
00518                 for (size_t i=0; i<nPV; ++i){                   
00519                         V.push_back(pvd[i].getAsInt());
00520                 }
00521                 return V;
00522         }
00523 
00524         
00525         int getAsIntV(std::vector<int> &valueV, std::vector<int> &statusV) {
00526                 
00527                 valueV.clear();
00528                 statusV.clear();
00529                 valueV.reserve(nPV);
00530     statusV.reserve(nPV);
00531                         
00532                 for (size_t i=0; i<nPV; ++i){
00533                          valueV.push_back(pvd[i].getAsInt());
00534                         statusV.push_back(pvd[i].getStatus());                  
00535                 }
00536                 return overallStatus;
00537         }
00538         
00539 
00540 
00541 
00542         std::vector<double> getAttributeAsDoubleV(std::string attribute) {
00543                 std::vector<double> V;
00544                 V.reserve(nPV);
00545                 char pvAtt[PVNAME_SIZE];
00546                 helper.removeLeadingAndTrailingSpaces(attribute.c_str(), pvAtt);
00547                 for (size_t i=0; i<nPV; ++i){
00548                         if ( ((std::string)pvAtt).compare((std::string) pvd[i].getAttribute()) ==0){
00549                                 V.push_back(pvd[i].getAsDouble());
00550                         }
00551                 }
00552                 return V;
00553         }
00554 
00555         std::vector<PVDataHolder> getPVDataV() {
00556                 std::vector<PVDataHolder> V;
00557                 V.reserve(nPV);
00558                 for (size_t i=0; i<nPV; ++i){                   
00559                         V.push_back(pvd[i]);
00560                 }
00561                 return V;
00562         }
00563 
00564         PVDataHolder   getPVData(unsigned int idx) {
00565                 if (idx > (nPV-1)) {
00566                         idx=nPV-1;
00567                 }
00568                 return pvd[idx];
00569         }
00570 
00571 
00572         PVDataHolder   getPVData(std::string name) {
00573                 for (size_t i=0; i< nPV; ++i) {
00574                         if (bsChannel[i].getName().compare(name) ==0 ) {
00575                                 return pvd[i];
00576                         }
00577                 }
00578         }
00579 
00580 
00581         int getStatus() { return overallStatus;}
00582 
00583 
00584 //cannot declare member function to have static linkage; therefore this must remain in header file
00585         static size_t RecvResponseCallback(char * contents, size_t size, size_t nmemb, void  * up) {
00586 
00587                 ++nCBs;
00588                 std::cout << "Callback called: " << nCBs << std::endl;
00589                 std::cout << "SIZE No. of Bytes " << size*nmemb << std::endl;
00590 
00591                 std::string sLocal=contents;
00592 
00593                 //remove \n for newline
00594                 std::size_t found = sLocal.find('\n');
00595 
00596                 if (found != std::string::npos) {
00597                         sLocal=sLocal.substr(0, found);
00598                 }
00599 
00600                 contentsBS=contentsBS+sLocal;
00601                 
00602                 //std::cout << contentsBS << std::endl;
00603 
00604                 return (size_t) size * nmemb;
00605         }
00606 
00607 
00608 
00609 
00610 //cannot declare member function to have static linkage; therefore this must remain in header file
00611         static size_t RecvResponseCallbackLive(char * contents, size_t size, size_t nmemb, void  * up) {
00612 
00613                 callbackLiveFlag=true;
00614 
00615                 //std::cout << "SIZE No. of Bytes = " << size*nmemb << std::endl;
00616                 std::string sLocal=contents;
00617 
00618                 //remove \n for newline
00619                 std::size_t found = sLocal.find('\n');
00620 
00621                 if (found != std::string::npos) {
00622                         sLocal=sLocal.substr(0, found);
00623                 }
00624                 
00625                 //std::cout << sLocal << std::endl;
00626                 //std::cout << " Value of callback Live from callback fn is " << callbackLiveFlag << std::endl; 
00627 
00628                 return (size_t) size * nmemb;
00629         }
00630 
00631         int reconnect(); 
00632 
00633         bool setBS(bool BSFlag); 
00634         
00635         bool resetBS() {
00636                 closeBS();
00637                 return setBS(true);     
00638         }
00639 
00640         bool setCA(bool CAFlag) {       
00641                 return CAFlag;
00642         }
00643         
00644         void closeBS() {
00645                 if (BSInitialized && isBS) {
00646                         #if HAVE_ZEROMQ
00647                         zmq_close (subscriber); 
00648                         zmq_ctx_destroy (context);
00649                         #endif
00650                         delete [] pvd;
00651                 }
00652                 
00653                 BSInitialized=false;
00654                 isBS=false;
00655                 initCalled=false;
00656                 return;
00657         }
00658         
00659         bool getIsBS() { return isBS;}
00660 
00661 
00662 };
00663 
00664 
00665 
00667 //
00668 // Digital BPM Holder 
00669 //
00671 
00672 class DBPMData{
00673 friend class ::CAFE;
00674 private:
00675         double val;
00676         epicsTimeStamp ets;
00677         int status;
00678 public:
00679         double getValue() {return val;}
00680         epicsTimeStamp getEpicsTimeStamp() {return ets;}
00681         int getStatus() { return status;}       
00682         
00683         DBPMData(){
00684         //status=ECAFE_BPM_DATA_IS_INVALID;     
00685         };
00686 };
00687 
00688 
00689 class DBPMKeeper
00690 {
00691 friend class ::CAFE;
00692 private:
00693         std::vector<DBPMData> x;
00694         std::vector<DBPMData> y;
00695         std::vector<DBPMData> q;
00696         std::vector<DBPMData> energy;
00697         
00698         std::vector<double> offs_x;
00699         std::vector<double> offs_y;
00700 
00701         unsigned long long pulse_id;
00702         
00703         bool isAllXOK;
00704         bool isAllYOK;
00705         bool isAllQOK;
00706         bool isAllEOK;
00707         bool isAllOK;
00708         
00709         std::vector<std::string>  pv;
00710         std::vector<unsigned int> handle;
00711         std::vector<std::string>  device;
00712         std::vector<float>    s; 
00713                 
00714         size_t nDBPM;
00715         size_t nPV;
00716                         
00717         bool isBS;
00718         bool BSInitialized;
00719         void *context; 
00720   
00721         void *receiver;
00722         int rc;
00723 
00724         #if HAVE_JSON
00725         Json::Value parsedFromString;
00726         Json::Reader reader;
00727         #endif
00728         bool parsingSuccessful;
00729 
00730 public: 
00731 
00732         std::vector<DBPMData> getX() { return x;}
00733         std::vector<DBPMData> getY() { return y;}
00734         std::vector<DBPMData> getQ() { return q;}
00735         std::vector<DBPMData> getEnergy() { return energy;}
00736         
00737         
00738         std::vector<double> getOffsetX() { return offs_x;}
00739         std::vector<double> getOffsetY() { return offs_y;}
00740         
00741         bool getIsAllXOK() {return isAllXOK;}
00742         bool getIsAllYOK() {return isAllYOK;}
00743         bool getIsAllQOK() {return isAllQOK;}   
00744         bool getIsAllEOK() {return isAllEOK;}   
00745         bool getIsAllOK()  {return isAllOK;}    
00746                 
00747         std::vector<std::string>  getPV(){ return pv;}
00748         std::vector<unsigned int> getHandle() { return handle;}
00749         std::vector<std::string>  getDevice() { return device;}
00750         std::vector<float>        getS() { return s;} 
00751         size_t getNDBPM() {return nDBPM;}
00752         size_t getNPV()   {return nPV;}
00753         int getStatus()   {return status;}
00754         
00755         
00756         int getPVIdx(std::string _pv) {                         
00757                 for (size_t i=0;  i< pv.size(); ++i) {   
00758                         if ( pv[i].compare(_pv) == 0) {                 
00759                                 return i;
00760                         }               
00761                 }
00762                 return -1;
00763         }
00764         
00765         
00766         unsigned long long getPulse_id(){return pulse_id;}
00767         void setPulse_id(unsigned long long _pulse_id){pulse_id=_pulse_id;}
00768         
00769         PVDataHolder * pvd;
00770         int status;
00771                                 
00772         size_t xIdx;
00773         size_t yIdx;
00774         size_t qIdx;
00775         size_t xValidIdx;
00776         size_t yValidIdx;
00777         size_t qValidIdx;
00778         size_t energyIdx;
00779         size_t endIdx;   
00780         
00781         void *subscriber;
00782 
00783         static size_t RecvResponseCallback(char * contents, size_t size, size_t nmemb, void  * up) {
00784 
00785                 ++nCBs;
00786                 //std::cout << "Callback called: " << nCBs << std::endl;
00787                 //std::cout << "SIZE No. of Bytes " << size*nmemb << std::endl;
00788 
00789                 std::string sLocal=contents;
00790 
00791                 //remove \n for newline
00792                 std::size_t found = sLocal.find('\n');
00793 
00794                 if (found != std::string::npos) {
00795 
00796                         sLocal=sLocal.substr(0, found);
00797                 }
00798 
00799                 contentsS=contentsS+sLocal;
00800                 
00801                 return (size_t) size * nmemb;
00802         }
00803 
00804         
00805 
00806         bool setBS(bool BSFlag) {
00807 
00808         if(MUTEX){cafeMutex.lock();}
00809 
00810         if (BSFlag) {
00811                 #if HAVE_CURL
00812 
00813                 std::string dataChannels=std::string("{\"channels\":[");
00814                 std::vector<std::string> pvNew=pv;
00815 
00816                 #if HAVE_ZEROMQ
00817 
00818                 if (!BSInitialized) {
00819 
00820                         size_t found;
00821                         dataChannels= dataChannels + std::string("{\"name\":\"");
00822                         dataChannels= dataChannels + pvNew[0];
00823                         //dataChannels= dataChannels + std::string("\",\"backend\":\"sf-databuffer\"}"          );
00824                         dataChannels= dataChannels + std::string("\",\"backend\":\"sf-databuffer\",\"modulo\":1,\"offset\":0}" );
00825 
00826                                 
00827                         for (size_t i=1; i < pvNew.size(); ++i) {
00828                                         
00829                                 found = pvNew[i].find("SARUN08-DBPM210");
00830                                 if (found != std::string::npos) continue;
00831                                 found = pvNew[i].find("SARUN08-DBPM410");
00832                                 if (found != std::string::npos) continue;
00833 
00834                                 found = pvNew[i].find("ENERGY");
00835                                 if (found != std::string::npos) continue;
00836                                 
00837                         
00838                                 dataChannels= dataChannels + std::string(",{\"name\":\"");
00839                                 dataChannels= dataChannels + pvNew[i];
00840 
00841                                 dataChannels= dataChannels + std::string("\",\"backend\":\"sf-databuffer\",\"modulo\":1,\"offset\":0}");
00842                                         
00843                         }
00844 
00845                         dataChannels= dataChannels + std::string("],");
00846                         dataChannels= dataChannels + "\"mapping\":{\"incomplete\":\"fill-null\"},\"channelValidation\":{\"inconsistency\":\"keep-as-is\"},\"sendBehaviour\":{\"strategy\":\"complete-all\"}}";
00847 
00848                         //std::cout <<  dataChannels << std::endl;
00849 
00850                         const char * data = dataChannels.c_str();
00851 
00852                         //std::cout << "SIZE OF DATA --------------->"  << sizeof(data) << std::endl;
00853 
00854                         CURL *curl;
00855                         CURLcode res;
00856                         struct curl_slist * slist;
00857                         slist = NULL;
00858                         
00859                         slist = curl_slist_append(slist, "Content-Type: application/json");
00860                 
00861                         curl_global_init(CURL_GLOBAL_ALL);
00862                                                                 
00863                         curl = curl_easy_init();
00864                                                         
00865                         if (curl) {
00866                         
00867                                 curl_easy_setopt(curl, CURLOPT_URL, "https://dispatcher-api.psi.ch/sf/stream");
00868                         
00869                                 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data); //"-F file=@./dbpm.json"); //data); //
00870                         
00871                                 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
00872                                 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
00873                                 
00874                                 //std::cout << "WAITING FOR CALLBACK... " << std::endl;
00875 
00876                                 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &RecvResponseCallback);
00877 
00878                                 res = curl_easy_perform(curl);
00879                                 
00880                                 if (res != CURLE_OK) {
00881                                         std::cout << "curl_easy_perform failed " << curl_easy_strerror(res) << std::endl;
00882                                 }
00883                                 else {
00884                                         std::cout << " CALLBACK DONE" << std::endl;
00885 
00886                                         curl_easy_cleanup(curl);
00887 
00888                                         curl_slist_free_all(slist);
00889 
00890                                         slist=NULL;
00891                                 }
00892                         }//if curl
00893                         
00894                         std::cout << "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" << std::endl;
00895                         curl_global_cleanup();
00896                         
00897 
00898                         //std::cout << " //1// SHOW contentS " << std::endl;
00899                         //std::cout << contentsS.c_str() << std::endl;
00900 
00901                         Json::Value parsedFromString;
00902                         Json::Reader reader;
00903                         bool parsingSuccessful;
00904 
00905                         Json::FastWriter fastWriter;
00906                         std::string globalZmqStream;
00907 
00908 
00909                         //printf("value= %s\n", contentsS.c_str());
00910                         
00911                         if (contentsS.size() > 2) {
00912                                 parsingSuccessful=reader.parse(contentsS.c_str(), parsedFromString);
00913                                 if (parsingSuccessful) {
00914                                         //Json::StyledWriter styledWriter;
00915                                         std::cout << "STYLED: --------------------------------" << std::endl;
00916                                         //std::cout << styledWriter.write(parsedFromString) << std::endl;
00917                                         //std::cout << "----------------------------------" << std::endl;
00918                                         std::cout << parsedFromString["stream"] << std::endl;
00919 
00920                                         std::cout << "----------------------------------" << std::endl;
00921                                         globalZmqStream = fastWriter.write(parsedFromString["stream"]).c_str();
00922                                         std::cout << globalZmqStream << std::endl;
00923 
00924                                         if ( parsedFromString["stream"].isNull() ) {
00925                                                 globalZmqStream.clear();
00926                                         }
00927                                 }
00928                                 else {
00929                                         std::cout << "PARSING IN CURL CALLBACK FUNCTION WAS UNSUCCESSFUL !!!" << std::endl;
00930                                         std::cout << contentsS.c_str() << std::endl;
00931                                         std::cout << reader.getFormattedErrorMessages() << std::endl;
00932 
00933                                 }
00934                         }       
00935                                                 
00936                         if (globalZmqStream.empty()) {
00937                                  std::cout << "BS Data is not available " << std::endl;
00938                                  if(MUTEX){cafeMutex.unlock();}
00939                                  return isBS=false;
00940                         }
00941 
00942                         context = zmq_ctx_new ();
00943                         
00945                         //HWM has no effect for PULL
00946                         //See documentation on zmq-socket
00947                         //WHEN PUSH Sender reachers HWM, then it blocks
00950                         //                      rc = zmq_bind (receiver, "tcp://129.129.145.206:5558"); //ZMQ_PULL
00952                                                 
00953                         subscriber = zmq_socket (context, ZMQ_SUB);
00954                         //rc = zmq_connect (subscriber, "tcp://129.129.145.206:5556");
00955                         //rc = zmq_connect (subscriber, "tcp://SIN-CVME-DBPM0421:9000");
00956                                                                         
00957                         globalZmqStream=globalZmqStream.substr(1,globalZmqStream.size()-3);
00958                         //std::cout << " globalZmqStream.c_str() " << globalZmqStream.c_str() << std::endl;
00959 
00960                         rc = zmq_connect (subscriber, (const char *) globalZmqStream.c_str());   //"tcp://sf-daqbuf-30.psi.ch:39927");
00961                                 
00962                         if (rc != 0 ) {
00963                                 std::cout << " Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
00964                         }
00965                                 
00966                         //rc = zmq_connect (subscriber, "tcp://*:9999");
00967                         //assert (rc == 0);
00968                         
00969                         int nhwm=1;
00970                         int timeoutMS=200; //10; //-1 Wait for Ever
00971                                 
00972                 
00973                                 
00974                         rc=zmq_setsockopt (subscriber,ZMQ_RCVHWM, &nhwm, sizeof(int));
00975                         rc=zmq_setsockopt (subscriber,ZMQ_SNDHWM, &nhwm, sizeof(int));  
00976                         //assert (rc == 0);
00977                                 
00978                         rc=zmq_setsockopt (subscriber,ZMQ_RCVTIMEO, &timeoutMS, sizeof(int));
00979                         //assert (rc == 0);
00980                                                 
00981                         rc=zmq_setsockopt (subscriber,ZMQ_SUBSCRIBE,"",0);
00982                         //assert (rc == 0);
00983                                                         
00984                         BSInitialized=true;
00985 
00986                 }//is BS initialized
00987 
00988                 #endif //have zeromq
00989 
00990                 if(MUTEX){cafeMutex.unlock();}
00991                 return isBS=BSFlag;
00992                 #else //have curl
00993                 
00994                 if(MUTEX){cafeMutex.unlock();}
00995                 return isBS=false;
00996                 #endif //have curl
00997         }//isBSFlag
00998                  
00999         if(MUTEX){cafeMutex.unlock();}
01000         return isBS=BSFlag;
01001         } // setBS
01002         
01003 
01004   
01005 
01006         bool resetBS() {
01007                 closeBS();
01008                 return setBS(true);     
01009         }
01010 
01011 
01012         bool setCA(bool CAFlag) {       
01013                 return CAFlag;
01014         }
01015         
01016         void closeBS() {
01017                 if (BSInitialized && isBS) {
01018                   #if HAVE_ZEROMQ
01019                         zmq_close (subscriber); 
01020         zmq_ctx_destroy (context);
01021                         #endif
01022                 }
01023                 BSInitialized=false;
01024                 isBS=false;
01025         }
01026         
01027         bool getIsBS() { return isBS;}
01028                 
01029         
01030         DBPMKeeper() {};
01031                 
01032         DBPMKeeper(std::vector<std::string> _pv, std::vector<unsigned int> _handle, std::map<float, std::string> posDev):isBS(false),BSInitialized(false)
01033         {
01034                 
01035                 pv.assign    (_pv.begin(),    _pv.end());
01036                 handle.assign(_handle.begin(),_handle.end());
01037                 
01038         
01039                 //fMap posDev;
01040                 std::map<float, std::string>::iterator pos;
01041                 for (pos =posDev.begin(); pos != posDev.end(); ++pos) {
01042                         s.push_back(pos->first);  device.push_back(pos->second);                                        
01043                 }               
01044         
01045                 pvd = new PVDataHolder[handle.size()];
01046                                 
01047                 //for (int i=0; i< handle.size(); ++i) {                
01048                 //      pvd[i].setNelem(1);             
01049                 //}
01050                 
01051                 nDBPM=device.size();
01052                 nPV=_pv.size(); 
01053                 status=ICAFE_NORMAL;    
01054                                         
01055                 xIdx     =  0;
01056                 yIdx     =  nDBPM;
01057                 qIdx     =2*nDBPM;
01058                 xValidIdx=3*nDBPM;
01059                 yValidIdx=4*nDBPM;
01060                 qValidIdx=5*nDBPM;
01061                 energyIdx=6*nDBPM;
01062                 endIdx   =7*nDBPM;      
01063         }       
01064         
01065                 
01066         DBPMKeeper(std::vector<std::string> _pv, std::vector<unsigned int> _handle, std::vector<std::string> _dev,  std::vector<float> _pos):isBS(false),BSInitialized(false)
01067         {
01068                 
01069                 pv.assign    (_pv.begin(),    _pv.end());
01070                 handle.assign(_handle.begin(),_handle.end());
01071                                                         
01072                 device.assign(_dev.begin(), _dev.end());
01073                 s.assign(_pos.begin(), _pos.end());
01074         
01075                 pvd = new PVDataHolder[handle.size()];
01076                 
01077                 //for (int i=0; i< handle.size(); ++i) {                
01078                 //      pvd[i].setNelem(1);             
01079                 //}
01080                 
01081                 nDBPM=device.size();
01082                 nPV=_pv.size(); 
01083                 status=ICAFE_NORMAL;    
01084                                         
01085                 xIdx     =  0;
01086                 yIdx     =  nDBPM;
01087                 qIdx     =2*nDBPM;
01088                 xValidIdx=3*nDBPM;
01089                 yValidIdx=4*nDBPM;
01090                 qValidIdx=5*nDBPM;
01091                 energyIdx=6*nDBPM;
01092                 endIdx   =7*nDBPM;      
01093         }       
01094         
01095 };
01096 
01097 
01098 }; //namespace
01099 
01100 
01101 #endif //ZBS_DATA_HOLDERS_H

Generated on 28 May 2018 for CAFE by  doxygen 1.6.1