00001
00002
00003
00004
00005
00006
00007
00008 #ifndef ZBS_DATA_HOLDERS_H
00009 #define ZBS_DATA_HOLDERS_H
00010
00011 #include <config.h>
00012 #include <vector>
00013 #include <PVDataHolder.h>
00014 #include <statusCodes.h>
00015 #include <handleHelper.h>
00016 #include <global.h>
00017
00018
00019 #if HAVE_ZEROMQ
00020 #include <zmq.h>
00021 #if HAVE_JSON
00022 #include <json/json.h>
00023 #endif
00024 #if HAVE_CURL
00025 #include <curl/curl.h>
00026 #endif
00027 #endif
00028
00029
00030 class CAFE;
00031
00032 namespace CAFEBS{
00033
00035
00036
00037
00038
00040
00041 class BSChannel {
00042 friend class BSDataHolder;
00043 private:
00044 std::string name;
00045 std::string type;
00046 std::vector<int> shape;
00047 unsigned int nelem;
00048 std::string encoding;
00049 int offset;
00050 int modulo;
00051 std::string compression;
00052 bool BSEnabled;
00053
00054 HandleHelper helper;
00055
00056 public:
00057
00058
00059
00060 void setType(std::string _type) { type=_type;}
00061
00062 void setOffset(int _offset) { offset =_offset;}
00063 void setModulo(int _modulo) { modulo =_modulo;}
00064 void setShape (std::vector<int> _shape) {
00065 nelem=1;
00066 shape.clear();
00067 shape.reserve(_shape.size());
00068 shape.assign(_shape.begin(),_shape.end());
00069 for (size_t i=0; i<shape.size(); ++i) {
00070 nelem=nelem * shape[i];
00071 }
00072 }
00073
00074 void setEncoding(std::string _encoding) {encoding =_encoding;}
00075 void setCompression(std::string _compression) {compression = _compression;}
00076 void setBSEnabled(bool _bse) {BSEnabled=_bse;}
00077
00078 std::string getName() {return name;}
00079 std::string getType() {return type;}
00080 int getModulo() {return modulo;}
00081 int getOffset() {return offset;}
00082 std::vector<int> getShape() {return shape;}
00083 unsigned int getNelem() {return nelem;}
00084 std::string getEncoding() {return encoding;}
00085 std::string getCompression() {return compression;}
00086 bool isBSEnabled() {return BSEnabled;}
00087
00088 BSChannel(std::string _name): offset(0), modulo(1) {
00089 char pv[PVNAME_SIZE];
00090 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00091 name=(std::string) pv;
00092 shape.clear();
00093 shape.push_back(1);
00094 nelem=1;
00095 encoding=std::string("little");
00096 compression=std::string("none");
00097 type=std::string("float64");
00098 BSEnabled=true;
00099 };
00100 BSChannel(std::string _name, int _modulo): offset(0) {
00101 char pv[PVNAME_SIZE];
00102 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00103 name=(std::string) pv; modulo=_modulo;
00104 shape.clear();
00105 shape.push_back(1);
00106 nelem=1;
00107 encoding=std::string("little");
00108 compression=std::string("none");
00109 type=std::string("float64");
00110 BSEnabled=true;
00111 };
00112 BSChannel(std::string _name, int _modulo, int _offset) {
00113 char pv[PVNAME_SIZE];
00114 helper.removeLeadingAndTrailingSpaces(_name.c_str(), pv);
00115 name=(std::string) pv; modulo=_modulo; offset=_offset;
00116 shape.clear();
00117 shape.push_back(1);
00118 nelem=1;
00119 encoding=std::string("little");
00120 compression=std::string("none");
00121 type=std::string("float64");
00122 BSEnabled=true;
00123 };
00124 };
00125
00126
00127
00128 class BSDataHolder{
00129 friend class ::CAFE;
00130 private:
00131 std::string htype;
00132 unsigned long long pulse_id;
00133 etsNorm global_timestamp;
00134 std::string hash;
00135 std::string dh_compression;
00136
00137 std::vector<BSChannel> bsChannel;
00138 HandleHelper helper;
00139
00140 std::vector<std::string> pv;
00141 std::vector<unsigned int> handle;
00142
00143 bool isBS;
00144 bool BSInitialized;
00145 bool resourceConnected;
00146
00147 unsigned int nPV;
00148 unsigned int nBSEnabled;
00149
00150 unsigned int nChannels;
00151 unsigned int nNullData;
00152 float pGoodData;
00153
00154 void *context;
00155 void *receiver;
00156 int rc;
00157 unsigned short nhwm;
00158 int timeoutMS;
00159
00160 #if HAVE_JSON
00161 Json::Value parsedFromString;
00162 Json::Reader reader;
00163 #endif
00164
00165 bool parsingSuccessful;
00166
00167 public:
00168 ~BSDataHolder(){
00169
00170
00171 };
00172
00173
00174
00175 BSDataHolder():isBS(true),BSInitialized(false),nBSEnabled(0)
00176 {
00177
00178
00179 overallStatus=ICAFE_NORMAL;
00180 resourceConnected=false;
00181 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;
00182 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;
00183 hash="undefined";
00184
00185 dh_compression="none";
00186 global_timestamp.secPastEpoch=0;
00187 global_timestamp.nsec=0;
00188 pulse_id=0;
00189 initCalled=false;
00190 nPV=0;
00191 nChannels=0;
00192 nNullData=0;
00193 pGoodData=0.0;
00194
00195 }
00196
00197 BSDataHolder(std::vector<std::string> _pv, std::vector<unsigned int> _handle):isBS(false),BSInitialized(false),nBSEnabled(0)
00198 {
00199 pv.clear(); handle.clear();
00200 pv.assign (_pv.begin(), _pv.end());
00201
00202 bsChannel.clear();
00203 for (size_t i=0; i< _pv.size(); ++i) {
00204 bsChannel.push_back(BSChannel(_pv[i]));
00205 }
00206
00207 handle.assign(_handle.begin(),_handle.end());
00208 pvd = new PVDataHolder[handle.size()];
00209
00210
00211 for (unsigned int i=0; i< handle.size(); ++i) {
00212 pvd[i].setNelem(helper.getNelemNative(handle[i]));
00213 }
00214
00215 nPV=_pv.size();
00216 overallStatus=ICAFE_NORMAL;
00217 resourceConnected=false;
00218 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;
00219 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;
00220 hash="undefined";
00221 dh_compression="none";
00222 global_timestamp.secPastEpoch=0;
00223 global_timestamp.nsec=0;
00224 pulse_id=0;
00225 initCalled=false;
00226 nChannels=0;
00227 nNullData=0;
00228 pGoodData=0.0;
00229 }
00230
00231 BSDataHolder(std::vector<std::string> _pv):isBS(true),BSInitialized(false),nBSEnabled(0)
00232 {
00233
00234 pv.clear();
00235 pv.assign(_pv.begin(),_pv.end());
00236
00237 handle.clear();
00238 handle.reserve(_pv.size());
00239
00240 bsChannel.clear();
00241 for (size_t i=0; i< _pv.size(); ++i) {
00242 bsChannel.push_back(BSChannel(_pv[i]));
00243 handle.push_back(0);
00244 }
00245
00246 pvd = new PVDataHolder[_pv.size()];
00247
00248
00249
00250
00251
00252 nPV=_pv.size();
00253 overallStatus=ICAFE_NORMAL;
00254 resourceConnected=false;
00255 nhwm= BSREAD_ZEROMQ_HIGH_WATER_MARK;
00256 timeoutMS= BSREAD_ZEROMQ_TIMEOUT_MS;
00257 hash="undefined";
00258 dh_compression="none";
00259 global_timestamp.secPastEpoch=0;
00260 global_timestamp.nsec=0;
00261 pulse_id=0;
00262 initCalled=false;
00263 nChannels=0;
00264 nNullData=0;
00265 pGoodData=0.0;
00266 }
00267
00268 void init(std::vector<std::string> _pv) {
00269
00270 if (!initCalled) {
00271 pv.clear();
00272 pv.assign(_pv.begin(),_pv.end());
00273 handle.clear();
00274 handle.reserve(_pv.size());
00275
00276 bsChannel.clear();
00277 for (size_t i=0; i< _pv.size(); ++i) {
00278 bsChannel.push_back(BSChannel(_pv[i]));
00279 handle.push_back(0);
00280 }
00281 pvd = new PVDataHolder[_pv.size()];
00282 nPV=_pv.size();
00283 initCalled=true;
00284 nBSEnabled=0;
00285 BSInitialized=false;
00286 isBS=false;
00287
00288 }
00289
00290 }
00291
00292
00293 bool initCalled;
00294 PVDataHolder * pvd;
00295
00296 std::string globalBSZmqStream;
00297
00298 unsigned int getHWM(){return nhwm;}
00299 int getTimeout() {return timeoutMS;}
00300 void setHWM(unsigned short _nhwm) {nhwm=_nhwm;}
00301 void setTimeout(int _timeoutMS) {timeoutMS=_timeoutMS;}
00302
00303 unsigned int getNPV(){return nPV;};
00304 unsigned int getNBSEnabled(){return nBSEnabled;};
00305
00306 BSChannel getBSChannel(unsigned int idx);
00307 BSChannel getBSChannel(std::string _name);
00308
00309 void setBSChannel(unsigned int idx, BSChannel bsc);
00310 void setBSChannel(BSChannel bsc);
00311
00312 void setBSModulo(std::string pv, int modulo) {
00313 BSChannel bsc=getBSChannel(pv);
00314 bsc.setModulo(modulo);
00315 setBSChannel(bsc);
00316 }
00317 void setBSOffset(std::string pv, int offset) {
00318 BSChannel bsc=getBSChannel(pv);
00319 bsc.setOffset(offset);
00320 setBSChannel(bsc);
00321 }
00322 void setBSModuloOffset(std::string pv, int modulo, int offset) {
00323 BSChannel bsc=getBSChannel(pv);
00324 bsc.setModulo(modulo);
00325 bsc.setOffset(offset);
00326 setBSChannel(bsc);
00327 }
00328
00329
00330
00331 int getIdxFromName(std::string _name);
00332
00333 void verifyIndex(unsigned int idx);
00334
00335 bool isIndexOutOfRange (unsigned int idx) {
00336 return (idx >= nPV) ? true:false;
00337 };
00338 void printHeader();
00339 int overallStatus;
00340 void *subscriber;
00341 int timeout;
00342
00343 bool isResourceConnected(){return resourceConnected;};
00344 void setResourceConnected(bool _rc){resourceConnected=_rc; return;};
00345
00346 int getPVIdx(std::string _pv) {
00347 #define __METHOD__ "getPVIdx(std::string _pv)"
00348 std::vector<std::string>::iterator it;
00349 it = find (pv.begin(), pv.end(), _pv);
00350
00351 if (it != pv.end()) {
00352
00353
00354 return std::distance(pv.begin(),it);
00355 }
00356 else {
00357 std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
00358 std::cout << _pv << " element not found in pv vector\n" << std::endl;
00359 return -1;
00360 }
00361
00362
00363
00364
00365
00366
00367
00368
00369 #undef __METHOD__
00370 }
00371
00372
00373 std::vector<std::string> getPV() { return pv;}
00374 std::string getPV(unsigned int idx) throw(std::out_of_range){
00375 if(isIndexOutOfRange(idx)) {
00376 std::ostringstream oss;
00377 oss << "Exception! Index " << idx
00378 << " to BSDataHolder method is out of range. Valid range is from 0 to " << nPV-1;
00379 throw std::out_of_range(oss.str());
00380 }
00381 return pv[idx];
00382 }
00383
00384 std::vector<unsigned int> getHandles(){return handle;}
00385 unsigned int getHandle(unsigned int idx) throw(std::out_of_range){
00386 if(isIndexOutOfRange(idx)) {
00387 std::ostringstream oss;
00388 oss << "Exception! Index " << idx
00389 << " to BSDataHolder method is out of range. Valid range is from 0 to " << nPV-1;
00390 throw std::out_of_range(oss.str());
00391 }
00392 return handle[idx];
00393 }
00394
00395 std::string getHtype() {return htype;}
00396 void setHtype(std::string _htype) {htype=_htype;}
00397
00398 unsigned long long getPulse_id(){return pulse_id;}
00399 void setPulse_id(unsigned long long _pulse_id){pulse_id=_pulse_id; return;}
00400
00401 etsNorm getGlobal_timestamp(){return global_timestamp;}
00402
00403 void setGlobal_timestamp(unsigned int _sec, unsigned int _nsec){
00404 global_timestamp.secPastEpoch=_sec;
00405 global_timestamp.nsec =_nsec;
00406 return;
00407 }
00408
00409 std::string getHash() {return hash;}
00410 void setHash(std::string _hash) {hash=_hash;}
00411
00412 std::string getDH_compression() {return dh_compression;}
00413 void setDH_compression(std::string dhc) {dh_compression= dhc;}
00414
00415
00416 void setNChannels(unsigned int _nc) {nChannels=_nc;}
00417 void setNNullData(unsigned int _nn) {nNullData=_nn;}
00418 void setPGoodData(float _pg) {pGoodData=_pg;}
00419 unsigned int getNChannels() {return nChannels;}
00420 unsigned int getNNullData() {return nNullData;}
00421 float getPGoodData() {return pGoodData;}
00422
00423
00424 std::vector<int> getStatusV() {
00425 std::vector<int> V;
00426 V.reserve(nPV);
00427 for (size_t i=0; i<nPV; ++i){
00428 V.push_back(pvd[i].getStatus());
00429 }
00430 return V;
00431 }
00432
00433
00434
00435 std::vector<float> getAsFloatV() {
00436 std::vector<float> V;
00437 V.reserve(nPV);
00438
00439 for (size_t i=0; i<nPV; ++i){
00440 V.push_back(pvd[i].getAsFloat());
00441 }
00442 return V;
00443 }
00444
00445
00446 int getAsFloatV(std::vector<float> &valueV, std::vector<int> &statusV) {
00447
00448 valueV.clear();
00449 statusV.clear();
00450 valueV.reserve(nPV);
00451 statusV.reserve(nPV);
00452
00453 for (size_t i=0; i<nPV; ++i){
00454 valueV.push_back(pvd[i].getAsFloat());
00455 statusV.push_back(pvd[i].getStatus());
00456 }
00457 return overallStatus;
00458 }
00459
00460
00461 std::vector<double> getAsDoubleV() {
00462 std::vector<double> V;
00463 V.reserve(nPV);
00464
00465
00466
00467 for (size_t i=0; i<nPV; ++i){
00468
00469 V.push_back(pvd[i].getAsDouble());
00470 }
00471 return V;
00472 }
00473
00474
00475 int getAsDoubleV(std::vector<double> &valueV, std::vector<int> &statusV) {
00476
00477 valueV.clear();
00478 statusV.clear();
00479 valueV.reserve(nPV);
00480 statusV.reserve(nPV);
00481
00482 for (size_t i=0; i<nPV; ++i){
00483 valueV.push_back(pvd[i].getAsDouble());
00484 statusV.push_back(pvd[i].getStatus());
00485 }
00486 return overallStatus;
00487 }
00488
00489
00490 std::vector<std::string> getAsStringV() {
00491 std::vector<std::string> V;
00492 V.reserve(nPV);
00493 for (size_t i=0; i<nPV; ++i){
00494 V.push_back(pvd[i].getAsString());
00495 }
00496 return V;
00497 }
00498
00499 int getAsStringV(std::vector<std::string> &valueV, std::vector<int> &statusV) {
00500
00501 valueV.clear();
00502 statusV.clear();
00503 valueV.reserve(nPV);
00504 statusV.reserve(nPV);
00505
00506 for (size_t i=0; i<nPV; ++i){
00507 valueV.push_back(pvd[i].getAsString());
00508 statusV.push_back(pvd[i].getStatus());
00509 }
00510 return overallStatus;
00511 }
00512
00513
00514
00515 std::vector<int> getAsIntV() {
00516 std::vector<int> V;
00517 V.reserve(nPV);
00518 for (size_t i=0; i<nPV; ++i){
00519 V.push_back(pvd[i].getAsInt());
00520 }
00521 return V;
00522 }
00523
00524
00525 int getAsIntV(std::vector<int> &valueV, std::vector<int> &statusV) {
00526
00527 valueV.clear();
00528 statusV.clear();
00529 valueV.reserve(nPV);
00530 statusV.reserve(nPV);
00531
00532 for (size_t i=0; i<nPV; ++i){
00533 valueV.push_back(pvd[i].getAsInt());
00534 statusV.push_back(pvd[i].getStatus());
00535 }
00536 return overallStatus;
00537 }
00538
00539
00540
00541
00542 std::vector<double> getAttributeAsDoubleV(std::string attribute) {
00543 std::vector<double> V;
00544 V.reserve(nPV);
00545 char pvAtt[PVNAME_SIZE];
00546 helper.removeLeadingAndTrailingSpaces(attribute.c_str(), pvAtt);
00547 for (size_t i=0; i<nPV; ++i){
00548 if ( ((std::string)pvAtt).compare((std::string) pvd[i].getAttribute()) ==0){
00549 V.push_back(pvd[i].getAsDouble());
00550 }
00551 }
00552 return V;
00553 }
00554
00555 std::vector<PVDataHolder> getPVDataV() {
00556 std::vector<PVDataHolder> V;
00557 V.reserve(nPV);
00558 for (size_t i=0; i<nPV; ++i){
00559 V.push_back(pvd[i]);
00560 }
00561 return V;
00562 }
00563
00564 PVDataHolder getPVData(unsigned int idx) {
00565 if (idx > (nPV-1)) {
00566 idx=nPV-1;
00567 }
00568 return pvd[idx];
00569 }
00570
00571
00572 PVDataHolder getPVData(std::string name) {
00573 for (size_t i=0; i< nPV; ++i) {
00574 if (bsChannel[i].getName().compare(name) ==0 ) {
00575 return pvd[i];
00576 }
00577 }
00578 }
00579
00580
00581 int getStatus() { return overallStatus;}
00582
00583
00584
00585 static size_t RecvResponseCallback(char * contents, size_t size, size_t nmemb, void * up) {
00586
00587 ++nCBs;
00588 std::cout << "Callback called: " << nCBs << std::endl;
00589 std::cout << "SIZE No. of Bytes " << size*nmemb << std::endl;
00590
00591 std::string sLocal=contents;
00592
00593
00594 std::size_t found = sLocal.find('\n');
00595
00596 if (found != std::string::npos) {
00597 sLocal=sLocal.substr(0, found);
00598 }
00599
00600 contentsBS=contentsBS+sLocal;
00601
00602
00603
00604 return (size_t) size * nmemb;
00605 }
00606
00607
00608
00609
00610
00611 static size_t RecvResponseCallbackLive(char * contents, size_t size, size_t nmemb, void * up) {
00612
00613 callbackLiveFlag=true;
00614
00615
00616 std::string sLocal=contents;
00617
00618
00619 std::size_t found = sLocal.find('\n');
00620
00621 if (found != std::string::npos) {
00622 sLocal=sLocal.substr(0, found);
00623 }
00624
00625
00626
00627
00628 return (size_t) size * nmemb;
00629 }
00630
00631 int reconnect();
00632
00633 bool setBS(bool BSFlag);
00634
00635 bool resetBS() {
00636 closeBS();
00637 return setBS(true);
00638 }
00639
00640 bool setCA(bool CAFlag) {
00641 return CAFlag;
00642 }
00643
00644 void closeBS() {
00645 if (BSInitialized && isBS) {
00646 #if HAVE_ZEROMQ
00647 zmq_close (subscriber);
00648 zmq_ctx_destroy (context);
00649 #endif
00650 delete [] pvd;
00651 }
00652
00653 BSInitialized=false;
00654 isBS=false;
00655 initCalled=false;
00656 return;
00657 }
00658
00659 bool getIsBS() { return isBS;}
00660
00661
00662 };
00663
00664
00665
00667
00668
00669
00671
00672 class DBPMData{
00673 friend class ::CAFE;
00674 private:
00675 double val;
00676 epicsTimeStamp ets;
00677 int status;
00678 public:
00679 double getValue() {return val;}
00680 epicsTimeStamp getEpicsTimeStamp() {return ets;}
00681 int getStatus() { return status;}
00682
00683 DBPMData(){
00684
00685 };
00686 };
00687
00688
00689 class DBPMKeeper
00690 {
00691 friend class ::CAFE;
00692 private:
00693 std::vector<DBPMData> x;
00694 std::vector<DBPMData> y;
00695 std::vector<DBPMData> q;
00696 std::vector<DBPMData> energy;
00697
00698 std::vector<double> offs_x;
00699 std::vector<double> offs_y;
00700
00701 unsigned long long pulse_id;
00702
00703 bool isAllXOK;
00704 bool isAllYOK;
00705 bool isAllQOK;
00706 bool isAllEOK;
00707 bool isAllOK;
00708
00709 std::vector<std::string> pv;
00710 std::vector<unsigned int> handle;
00711 std::vector<std::string> device;
00712 std::vector<float> s;
00713
00714 size_t nDBPM;
00715 size_t nPV;
00716
00717 bool isBS;
00718 bool BSInitialized;
00719 void *context;
00720
00721 void *receiver;
00722 int rc;
00723
00724 #if HAVE_JSON
00725 Json::Value parsedFromString;
00726 Json::Reader reader;
00727 #endif
00728 bool parsingSuccessful;
00729
00730 public:
00731
00732 std::vector<DBPMData> getX() { return x;}
00733 std::vector<DBPMData> getY() { return y;}
00734 std::vector<DBPMData> getQ() { return q;}
00735 std::vector<DBPMData> getEnergy() { return energy;}
00736
00737
00738 std::vector<double> getOffsetX() { return offs_x;}
00739 std::vector<double> getOffsetY() { return offs_y;}
00740
00741 bool getIsAllXOK() {return isAllXOK;}
00742 bool getIsAllYOK() {return isAllYOK;}
00743 bool getIsAllQOK() {return isAllQOK;}
00744 bool getIsAllEOK() {return isAllEOK;}
00745 bool getIsAllOK() {return isAllOK;}
00746
00747 std::vector<std::string> getPV(){ return pv;}
00748 std::vector<unsigned int> getHandle() { return handle;}
00749 std::vector<std::string> getDevice() { return device;}
00750 std::vector<float> getS() { return s;}
00751 size_t getNDBPM() {return nDBPM;}
00752 size_t getNPV() {return nPV;}
00753 int getStatus() {return status;}
00754
00755
00756 int getPVIdx(std::string _pv) {
00757 for (size_t i=0; i< pv.size(); ++i) {
00758 if ( pv[i].compare(_pv) == 0) {
00759 return i;
00760 }
00761 }
00762 return -1;
00763 }
00764
00765
00766 unsigned long long getPulse_id(){return pulse_id;}
00767 void setPulse_id(unsigned long long _pulse_id){pulse_id=_pulse_id;}
00768
00769 PVDataHolder * pvd;
00770 int status;
00771
00772 size_t xIdx;
00773 size_t yIdx;
00774 size_t qIdx;
00775 size_t xValidIdx;
00776 size_t yValidIdx;
00777 size_t qValidIdx;
00778 size_t energyIdx;
00779 size_t endIdx;
00780
00781 void *subscriber;
00782
00783 static size_t RecvResponseCallback(char * contents, size_t size, size_t nmemb, void * up) {
00784
00785 ++nCBs;
00786
00787
00788
00789 std::string sLocal=contents;
00790
00791
00792 std::size_t found = sLocal.find('\n');
00793
00794 if (found != std::string::npos) {
00795
00796 sLocal=sLocal.substr(0, found);
00797 }
00798
00799 contentsS=contentsS+sLocal;
00800
00801 return (size_t) size * nmemb;
00802 }
00803
00804
00805
00806 bool setBS(bool BSFlag) {
00807
00808 if(MUTEX){cafeMutex.lock();}
00809
00810 if (BSFlag) {
00811 #if HAVE_CURL
00812
00813 std::string dataChannels=std::string("{\"channels\":[");
00814 std::vector<std::string> pvNew=pv;
00815
00816 #if HAVE_ZEROMQ
00817
00818 if (!BSInitialized) {
00819
00820 size_t found;
00821 dataChannels= dataChannels + std::string("{\"name\":\"");
00822 dataChannels= dataChannels + pvNew[0];
00823
00824 dataChannels= dataChannels + std::string("\",\"backend\":\"sf-databuffer\",\"modulo\":1,\"offset\":0}" );
00825
00826
00827 for (size_t i=1; i < pvNew.size(); ++i) {
00828
00829 found = pvNew[i].find("SARUN08-DBPM210");
00830 if (found != std::string::npos) continue;
00831 found = pvNew[i].find("SARUN08-DBPM410");
00832 if (found != std::string::npos) continue;
00833
00834 found = pvNew[i].find("ENERGY");
00835 if (found != std::string::npos) continue;
00836
00837
00838 dataChannels= dataChannels + std::string(",{\"name\":\"");
00839 dataChannels= dataChannels + pvNew[i];
00840
00841 dataChannels= dataChannels + std::string("\",\"backend\":\"sf-databuffer\",\"modulo\":1,\"offset\":0}");
00842
00843 }
00844
00845 dataChannels= dataChannels + std::string("],");
00846 dataChannels= dataChannels + "\"mapping\":{\"incomplete\":\"fill-null\"},\"channelValidation\":{\"inconsistency\":\"keep-as-is\"},\"sendBehaviour\":{\"strategy\":\"complete-all\"}}";
00847
00848
00849
00850 const char * data = dataChannels.c_str();
00851
00852
00853
00854 CURL *curl;
00855 CURLcode res;
00856 struct curl_slist * slist;
00857 slist = NULL;
00858
00859 slist = curl_slist_append(slist, "Content-Type: application/json");
00860
00861 curl_global_init(CURL_GLOBAL_ALL);
00862
00863 curl = curl_easy_init();
00864
00865 if (curl) {
00866
00867 curl_easy_setopt(curl, CURLOPT_URL, "https://dispatcher-api.psi.ch/sf/stream");
00868
00869 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
00870
00871 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
00872 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
00873
00874
00875
00876 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &RecvResponseCallback);
00877
00878 res = curl_easy_perform(curl);
00879
00880 if (res != CURLE_OK) {
00881 std::cout << "curl_easy_perform failed " << curl_easy_strerror(res) << std::endl;
00882 }
00883 else {
00884 std::cout << " CALLBACK DONE" << std::endl;
00885
00886 curl_easy_cleanup(curl);
00887
00888 curl_slist_free_all(slist);
00889
00890 slist=NULL;
00891 }
00892 }
00893
00894 std::cout << "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" << std::endl;
00895 curl_global_cleanup();
00896
00897
00898
00899
00900
00901 Json::Value parsedFromString;
00902 Json::Reader reader;
00903 bool parsingSuccessful;
00904
00905 Json::FastWriter fastWriter;
00906 std::string globalZmqStream;
00907
00908
00909
00910
00911 if (contentsS.size() > 2) {
00912 parsingSuccessful=reader.parse(contentsS.c_str(), parsedFromString);
00913 if (parsingSuccessful) {
00914
00915 std::cout << "STYLED: --------------------------------" << std::endl;
00916
00917
00918 std::cout << parsedFromString["stream"] << std::endl;
00919
00920 std::cout << "----------------------------------" << std::endl;
00921 globalZmqStream = fastWriter.write(parsedFromString["stream"]).c_str();
00922 std::cout << globalZmqStream << std::endl;
00923
00924 if ( parsedFromString["stream"].isNull() ) {
00925 globalZmqStream.clear();
00926 }
00927 }
00928 else {
00929 std::cout << "PARSING IN CURL CALLBACK FUNCTION WAS UNSUCCESSFUL !!!" << std::endl;
00930 std::cout << contentsS.c_str() << std::endl;
00931 std::cout << reader.getFormattedErrorMessages() << std::endl;
00932
00933 }
00934 }
00935
00936 if (globalZmqStream.empty()) {
00937 std::cout << "BS Data is not available " << std::endl;
00938 if(MUTEX){cafeMutex.unlock();}
00939 return isBS=false;
00940 }
00941
00942 context = zmq_ctx_new ();
00943
00945
00946
00947
00950
00952
00953 subscriber = zmq_socket (context, ZMQ_SUB);
00954
00955
00956
00957 globalZmqStream=globalZmqStream.substr(1,globalZmqStream.size()-3);
00958
00959
00960 rc = zmq_connect (subscriber, (const char *) globalZmqStream.c_str());
00961
00962 if (rc != 0 ) {
00963 std::cout << " Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
00964 }
00965
00966
00967
00968
00969 int nhwm=1;
00970 int timeoutMS=200;
00971
00972
00973
00974 rc=zmq_setsockopt (subscriber,ZMQ_RCVHWM, &nhwm, sizeof(int));
00975 rc=zmq_setsockopt (subscriber,ZMQ_SNDHWM, &nhwm, sizeof(int));
00976
00977
00978 rc=zmq_setsockopt (subscriber,ZMQ_RCVTIMEO, &timeoutMS, sizeof(int));
00979
00980
00981 rc=zmq_setsockopt (subscriber,ZMQ_SUBSCRIBE,"",0);
00982
00983
00984 BSInitialized=true;
00985
00986 }
00987
00988 #endif //have zeromq
00989
00990 if(MUTEX){cafeMutex.unlock();}
00991 return isBS=BSFlag;
00992 #else //have curl
00993
00994 if(MUTEX){cafeMutex.unlock();}
00995 return isBS=false;
00996 #endif //have curl
00997 }
00998
00999 if(MUTEX){cafeMutex.unlock();}
01000 return isBS=BSFlag;
01001 }
01002
01003
01004
01005
01006 bool resetBS() {
01007 closeBS();
01008 return setBS(true);
01009 }
01010
01011
01012 bool setCA(bool CAFlag) {
01013 return CAFlag;
01014 }
01015
01016 void closeBS() {
01017 if (BSInitialized && isBS) {
01018 #if HAVE_ZEROMQ
01019 zmq_close (subscriber);
01020 zmq_ctx_destroy (context);
01021 #endif
01022 }
01023 BSInitialized=false;
01024 isBS=false;
01025 }
01026
01027 bool getIsBS() { return isBS;}
01028
01029
01030 DBPMKeeper() {};
01031
01032 DBPMKeeper(std::vector<std::string> _pv, std::vector<unsigned int> _handle, std::map<float, std::string> posDev):isBS(false),BSInitialized(false)
01033 {
01034
01035 pv.assign (_pv.begin(), _pv.end());
01036 handle.assign(_handle.begin(),_handle.end());
01037
01038
01039
01040 std::map<float, std::string>::iterator pos;
01041 for (pos =posDev.begin(); pos != posDev.end(); ++pos) {
01042 s.push_back(pos->first); device.push_back(pos->second);
01043 }
01044
01045 pvd = new PVDataHolder[handle.size()];
01046
01047
01048
01049
01050
01051 nDBPM=device.size();
01052 nPV=_pv.size();
01053 status=ICAFE_NORMAL;
01054
01055 xIdx = 0;
01056 yIdx = nDBPM;
01057 qIdx =2*nDBPM;
01058 xValidIdx=3*nDBPM;
01059 yValidIdx=4*nDBPM;
01060 qValidIdx=5*nDBPM;
01061 energyIdx=6*nDBPM;
01062 endIdx =7*nDBPM;
01063 }
01064
01065
01066 DBPMKeeper(std::vector<std::string> _pv, std::vector<unsigned int> _handle, std::vector<std::string> _dev, std::vector<float> _pos):isBS(false),BSInitialized(false)
01067 {
01068
01069 pv.assign (_pv.begin(), _pv.end());
01070 handle.assign(_handle.begin(),_handle.end());
01071
01072 device.assign(_dev.begin(), _dev.end());
01073 s.assign(_pos.begin(), _pos.end());
01074
01075 pvd = new PVDataHolder[handle.size()];
01076
01077
01078
01079
01080
01081 nDBPM=device.size();
01082 nPV=_pv.size();
01083 status=ICAFE_NORMAL;
01084
01085 xIdx = 0;
01086 yIdx = nDBPM;
01087 qIdx =2*nDBPM;
01088 xValidIdx=3*nDBPM;
01089 yValidIdx=4*nDBPM;
01090 qValidIdx=5*nDBPM;
01091 energyIdx=6*nDBPM;
01092 endIdx =7*nDBPM;
01093 }
01094
01095 };
01096
01097
01098 };
01099
01100
01101 #endif //ZBS_DATA_HOLDERS_H