fastcgi++
|
00001 00002 /*************************************************************************** 00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] * 00004 * * 00005 * This file is part of fastcgi++. * 00006 * * 00007 * fastcgi++ is free software: you can redistribute it and/or modify it * 00008 * under the terms of the GNU Lesser General Public License as published * 00009 * by the Free Software Foundation, either version 3 of the License, or (at * 00010 * your option) any later version. * 00011 * * 00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT * 00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * 00014 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * 00015 * License for more details. * 00016 * * 00017 * You should have received a copy of the GNU Lesser General Public License * 00018 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. * 00019 ****************************************************************************/ 00020 00021 #ifndef ASQL_HPP 00022 #define ASQL_HPP 00023 00024 #include <vector> 00025 #include <queue> 00026 #include <cstring> 00027 00028 #include <boost/date_time/posix_time/posix_time.hpp> 00029 #include <boost/shared_ptr.hpp> 00030 #include <boost/shared_array.hpp> 00031 #include <boost/scoped_array.hpp> 00032 #include <boost/function.hpp> 00033 #include <boost/bind.hpp> 00034 #include <boost/thread.hpp> 00035 00036 #include <asql/query.hpp> 00037 #include <asql/data.hpp> 00038 00040 namespace ASql 00041 { 00055 template<class T> class Transaction 00056 { 00057 public: 00061 struct Item 00062 { 00063 Item(QueryPar query, T* statement): m_query(query), m_statement(statement) {} 00064 Item(const Item& x): m_query(x.m_query), m_statement(x.m_statement) {} 00065 QueryPar m_query; 00066 T* m_statement; 00067 }; 00068 private: 00069 std::vector<Item> m_items; 00070 public: 00071 typedef typename std::vector<Item>::iterator iterator; 00080 inline void push(QueryPar& query, T& statement) { m_items.push_back(Item(query, &statement)); } 00084 inline void clear() { m_items.clear(); } 00088 inline iterator begin() { return m_items.begin(); } 00092 inline iterator end() { return m_items.end(); } 00096 inline bool empty() { return m_items.size()==0; } 00097 00101 void cancel(); 00102 00106 void start(int instance=-1) { m_items.front().m_statement->connection.queue(*this, instance); } 00107 }; 00108 00112 class Connection 00113 { 00114 public: 00116 int threads() const { return maxThreads; } 00117 00119 bool running() const { return m_threads; } 00120 protected: 00124 const int maxThreads; 00125 boost::mutex threadsMutex; 00126 boost::condition_variable threadsChanged; 00127 int m_threads; 00128 00129 virtual void commit(const unsigned int thread=0)=0; 00130 virtual void rollback(const unsigned int thread=0)=0; 00131 00132 boost::scoped_array<boost::condition_variable> wakeUp; 00133 00134 boost::mutex terminateMutex; 00135 bool terminateBool; 00136 00137 Connection(const int maxThreads_): maxThreads(maxThreads_), m_threads(0), wakeUp(new boost::condition_variable[maxThreads_]) {} 00138 }; 00139 00143 template<class T> class ConnectionPar: public Connection 00144 { 00145 private: 00146 struct QuerySet 00147 { 00148 QuerySet(QueryPar& query, T* const& statement, const bool commit): m_query(query), m_statement(statement), m_commit(commit) {} 00149 QueryPar m_query; 00150 bool m_commit; 00151 T* m_statement; 00152 }; 00156 class Queries: public std::queue<QuerySet>, public boost::mutex {}; 00157 boost::scoped_array<Queries> queries; 00158 00162 void intHandler(const unsigned int id); 00163 00167 class SetCanceler 00168 { 00169 const bool*& m_canceler; 00170 public: 00171 SetCanceler(const bool*& canceler, bool& dest): m_canceler(canceler) { canceler=&dest; } 00172 ~SetCanceler() { m_canceler=&s_false; } 00173 }; 00174 00175 protected: 00176 ConnectionPar(const int maxThreads_): Connection(maxThreads_), queries(new Queries[maxThreads_]) {} 00177 public: 00179 int queriesSize() const; 00180 00184 void start(); 00188 void terminate(); 00192 void queue(Transaction<T>& transaction, int instance); 00193 inline void queue(T* const& statement, QueryPar& query, int instance); 00194 00195 static const bool s_false; 00196 }; 00197 00201 class Statement 00202 { 00203 protected: 00204 boost::scoped_array<Data::Conversions> paramsConversions; 00205 boost::scoped_array<Data::Conversions> resultsConversions; 00206 00207 Statement(unsigned int threads): 00208 paramsConversions(new Data::Conversions[threads]), 00209 resultsConversions(new Data::Conversions[threads]) {} 00210 }; 00211 } 00212 00213 template<class T> void ASql::ConnectionPar<T>::start() 00214 { 00215 { 00216 boost::lock_guard<boost::mutex> terminateLock(terminateMutex); 00217 terminateBool=false; 00218 } 00219 00220 boost::unique_lock<boost::mutex> threadsLock(threadsMutex); 00221 while(m_threads<maxThreads) 00222 { 00223 boost::thread(boost::bind(&ConnectionPar<T>::intHandler, boost::ref(*this), m_threads)); 00224 threadsChanged.wait(threadsLock); 00225 } 00226 } 00227 00228 template<class T> void ASql::ConnectionPar<T>::terminate() 00229 { 00230 { 00231 boost::lock_guard<boost::mutex> terminateLock(terminateMutex); 00232 terminateBool=true; 00233 } 00234 for(boost::condition_variable* i=wakeUp.get(); i<wakeUp.get()+threads(); ++i) 00235 i->notify_all(); 00236 00237 boost::unique_lock<boost::mutex> threadsLock(threadsMutex); 00238 while(m_threads) 00239 threadsChanged.wait(threadsLock); 00240 } 00241 00242 template<class T> void ASql::ConnectionPar<T>::intHandler(const unsigned int id) 00243 { 00244 { 00245 boost::lock_guard<boost::mutex> threadsLock(threadsMutex); 00246 ++m_threads; 00247 } 00248 threadsChanged.notify_one(); 00249 00250 boost::unique_lock<boost::mutex> terminateLock(terminateMutex, boost::defer_lock_t()); 00251 boost::unique_lock<boost::mutex> queriesLock(queries[id], boost::defer_lock_t()); 00252 00253 while(1) 00254 { 00255 terminateLock.lock(); 00256 if(terminateBool) 00257 break; 00258 terminateLock.unlock(); 00259 00260 queriesLock.lock(); 00261 if(!queries[id].size()) 00262 { 00263 wakeUp[id].wait(queriesLock); 00264 queriesLock.unlock(); 00265 continue; 00266 } 00267 QuerySet querySet=queries[id].front(); 00268 queries[id].pop(); 00269 queriesLock.unlock(); 00270 00271 Error error; 00272 00273 try 00274 { 00275 SetCanceler SetCanceler(querySet.m_statement->m_stop[id], querySet.m_query.m_sharedData->m_cancel); 00276 if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_PARAMETERS) 00277 { 00278 if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_RESULTS) 00279 { 00280 if(!querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), *static_cast<Data::Set*>(querySet.m_query.results()), false, id)) querySet.m_query.clearResults(); 00281 } 00282 else 00283 querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), static_cast<Data::SetContainer*>(querySet.m_query.results()), querySet.m_query.m_sharedData->m_insertId, querySet.m_query.m_sharedData->m_rows, false, id); 00284 } 00285 else 00286 { 00287 querySet.m_statement->execute(*static_cast<const Data::SetContainer*>(querySet.m_query.parameters()), querySet.m_query.m_sharedData->m_rows, false, id); 00288 } 00289 00290 if(querySet.m_commit) 00291 commit(id); 00292 00293 querySet.m_query.m_sharedData->m_error=Error(); 00294 } 00295 catch(const Error& e) 00296 { 00297 querySet.m_query.m_sharedData->m_error=e; 00298 00299 rollback(id); 00300 00301 queriesLock.lock(); 00302 QuerySet tmpQuerySet=querySet; 00303 while(!querySet.m_commit && queries[id].size()) 00304 { 00305 tmpQuerySet=queries[id].front(); 00306 queries[id].pop(); 00307 if(!querySet.m_query.isCallback() && tmpQuerySet.m_query.isCallback()) 00308 querySet.m_query.setCallback(tmpQuerySet.m_query.getCallback()); 00309 00310 } 00311 queriesLock.unlock(); 00312 } 00313 00314 querySet.m_query.callback(); 00315 } 00316 00317 { 00318 boost::lock_guard<boost::mutex> threadsLock(threadsMutex); 00319 --m_threads; 00320 } 00321 threadsChanged.notify_one(); 00322 } 00323 00324 template<class T> void ASql::ConnectionPar<T>::queue(T* const& statement, QueryPar& query, int instance) 00325 { 00326 if(instance == -1) 00327 { 00328 instance=0; 00329 for(unsigned int i=1; i<threads(); ++i) 00330 {{ 00331 boost::lock_guard<boost::mutex> queriesLock(queries[i]); 00332 if(queries[i].size() < queries[instance].size()) 00333 instance=i; 00334 }} 00335 } 00336 00337 boost::lock_guard<boost::mutex> queriesLock(queries[instance]); 00338 queries[instance].push(QuerySet(query, statement, true)); 00339 wakeUp[instance].notify_one(); 00340 } 00341 00342 template<class T> const bool ASql::ConnectionPar<T>::s_false = false; 00343 00344 template<class T> void ASql::ConnectionPar<T>::queue(Transaction<T>& transaction, int instance) 00345 { 00346 if(instance == -1) 00347 { 00348 instance=0; 00349 for(unsigned int i=1; i<threads(); ++i) 00350 {{ 00351 boost::lock_guard<boost::mutex> queriesLock(queries[i]); 00352 if(queries[i].size() < queries[instance].size()) 00353 instance=i; 00354 }} 00355 } 00356 00357 boost::lock_guard<boost::mutex> queriesLock(queries[instance]); 00358 00359 for(typename Transaction<T>::iterator it=transaction.begin(); it!=transaction.end(); ++it) 00360 queries[instance].push(QuerySet(it->m_query, it->m_statement, false)); 00361 queries[instance].back().m_commit = true; 00362 00363 wakeUp[instance].notify_one(); 00364 } 00365 00366 template<class T> void ASql::Transaction<T>::cancel() 00367 { 00368 for(iterator it=begin(); it!=end(); ++it) 00369 it->m_query.cancel(); 00370 } 00371 00372 template<class T> int ASql::ConnectionPar<T>::queriesSize() const 00373 { 00374 int size=0; 00375 for(unsigned int i=1; i<threads(); ++i) 00376 {{ 00377 boost::lock_guard<boost::mutex> queriesLock(queries[i]); 00378 size += queries[i].size(); 00379 }} 00380 00381 return size; 00382 } 00383 00384 #endif