fastcgi++
asql.hpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 #ifndef ASQL_HPP
00022 #define ASQL_HPP
00023 
00024 #include <vector>
00025 #include <queue>
00026 #include <cstring>
00027 
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029 #include <boost/shared_ptr.hpp>
00030 #include <boost/shared_array.hpp>
00031 #include <boost/scoped_array.hpp>
00032 #include <boost/function.hpp>
00033 #include <boost/bind.hpp>
00034 #include <boost/thread.hpp>
00035 
00036 #include <asql/query.hpp>
00037 #include <asql/data.hpp>
00038 
00040 namespace ASql
00041 {
00055    template<class T> class Transaction
00056    {
00057    public:
00061       struct Item
00062       {
00063          Item(QueryPar query, T* statement): m_query(query), m_statement(statement) {}
00064          Item(const Item& x): m_query(x.m_query), m_statement(x.m_statement) {}
00065          QueryPar m_query;
00066          T* m_statement;
00067       };
00068    private:
00069       std::vector<Item> m_items;
00070    public:
00071       typedef typename std::vector<Item>::iterator iterator;
00080       inline void push(QueryPar& query, T& statement) { m_items.push_back(Item(query, &statement)); }
00084       inline void clear() { m_items.clear(); }
00088       inline iterator begin() { return m_items.begin(); }
00092       inline iterator end() { return m_items.end(); }
00096       inline bool empty() { return m_items.size()==0; }
00097 
00101       void cancel();
00102 
00106       void start(int instance=-1) { m_items.front().m_statement->connection.queue(*this, instance); }
00107    };
00108 
00112    class Connection
00113    {
00114    public:
00116       int threads() const { return maxThreads; }
00117 
00119       bool running() const { return m_threads; }
00120    protected:
00124       const int maxThreads;
00125       boost::mutex threadsMutex;
00126       boost::condition_variable threadsChanged;
00127       int m_threads;
00128 
00129       virtual void commit(const unsigned int thread=0)=0;
00130       virtual void rollback(const unsigned int thread=0)=0;
00131 
00132       boost::scoped_array<boost::condition_variable> wakeUp;
00133 
00134       boost::mutex terminateMutex;
00135       bool terminateBool;
00136 
00137       Connection(const int maxThreads_): maxThreads(maxThreads_), m_threads(0), wakeUp(new boost::condition_variable[maxThreads_]) {}
00138    };
00139 
00143    template<class T> class ConnectionPar: public Connection
00144    {
00145    private:
00146       struct QuerySet
00147       {
00148          QuerySet(QueryPar& query, T* const& statement, const bool commit): m_query(query), m_statement(statement), m_commit(commit) {}
00149          QueryPar m_query;
00150          bool m_commit;
00151          T* m_statement;
00152       };
00156       class Queries: public std::queue<QuerySet>, public boost::mutex {};
00157       boost::scoped_array<Queries> queries;
00158 
00162       void intHandler(const unsigned int id);
00163 
00167       class SetCanceler
00168       {
00169          const bool*& m_canceler;
00170       public:
00171          SetCanceler(const bool*& canceler, bool& dest): m_canceler(canceler) { canceler=&dest; }
00172          ~SetCanceler() { m_canceler=&s_false; }         
00173       };
00174 
00175    protected:
00176       ConnectionPar(const int maxThreads_): Connection(maxThreads_), queries(new Queries[maxThreads_]) {}
00177    public:
00179       int queriesSize() const; 
00180 
00184       void start();
00188       void terminate();
00192       void queue(Transaction<T>& transaction, int instance);
00193       inline void queue(T* const& statement, QueryPar& query, int instance);
00194 
00195       static const bool s_false;
00196    }; 
00197 
00201    class Statement
00202    {
00203    protected:
00204       boost::scoped_array<Data::Conversions> paramsConversions;
00205       boost::scoped_array<Data::Conversions> resultsConversions;
00206    
00207    Statement(unsigned int threads):
00208       paramsConversions(new Data::Conversions[threads]),
00209       resultsConversions(new Data::Conversions[threads]) {}
00210    };
00211 }
00212 
00213 template<class T> void ASql::ConnectionPar<T>::start()
00214 {
00215    {
00216       boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
00217       terminateBool=false;
00218    }
00219    
00220    boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
00221    while(m_threads<maxThreads)
00222    {
00223       boost::thread(boost::bind(&ConnectionPar<T>::intHandler, boost::ref(*this), m_threads));
00224       threadsChanged.wait(threadsLock);
00225    }
00226 }
00227 
00228 template<class T> void ASql::ConnectionPar<T>::terminate()
00229 {
00230    {
00231       boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
00232       terminateBool=true;
00233    }
00234    for(boost::condition_variable* i=wakeUp.get(); i<wakeUp.get()+threads(); ++i)
00235       i->notify_all();
00236 
00237    boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
00238    while(m_threads)
00239       threadsChanged.wait(threadsLock);
00240 }
00241 
00242 template<class T> void ASql::ConnectionPar<T>::intHandler(const unsigned int id)
00243 {
00244    {
00245       boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
00246       ++m_threads;
00247    }
00248    threadsChanged.notify_one();
00249    
00250    boost::unique_lock<boost::mutex> terminateLock(terminateMutex, boost::defer_lock_t());
00251    boost::unique_lock<boost::mutex> queriesLock(queries[id], boost::defer_lock_t());
00252 
00253    while(1)
00254    {
00255       terminateLock.lock();
00256       if(terminateBool)
00257          break;
00258       terminateLock.unlock();
00259       
00260       queriesLock.lock();
00261       if(!queries[id].size())
00262       {
00263          wakeUp[id].wait(queriesLock);
00264          queriesLock.unlock();
00265          continue;
00266       }
00267       QuerySet querySet=queries[id].front();
00268       queries[id].pop();
00269       queriesLock.unlock();
00270 
00271       Error error;
00272 
00273       try
00274       {
00275          SetCanceler SetCanceler(querySet.m_statement->m_stop[id], querySet.m_query.m_sharedData->m_cancel);
00276          if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_PARAMETERS)
00277          {
00278             if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_RESULTS)
00279             {
00280                if(!querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), *static_cast<Data::Set*>(querySet.m_query.results()), false, id)) querySet.m_query.clearResults();
00281             }
00282             else
00283                querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), static_cast<Data::SetContainer*>(querySet.m_query.results()), querySet.m_query.m_sharedData->m_insertId, querySet.m_query.m_sharedData->m_rows, false, id);
00284          }
00285          else
00286          {
00287             querySet.m_statement->execute(*static_cast<const Data::SetContainer*>(querySet.m_query.parameters()), querySet.m_query.m_sharedData->m_rows, false, id);
00288          }
00289 
00290          if(querySet.m_commit)
00291             commit(id);
00292 
00293          querySet.m_query.m_sharedData->m_error=Error();
00294       }
00295       catch(const Error& e)
00296       {
00297          querySet.m_query.m_sharedData->m_error=e;
00298 
00299          rollback(id);
00300 
00301          queriesLock.lock();
00302          QuerySet tmpQuerySet=querySet;
00303          while(!querySet.m_commit && queries[id].size())
00304          {
00305             tmpQuerySet=queries[id].front();
00306             queries[id].pop();
00307             if(!querySet.m_query.isCallback() && tmpQuerySet.m_query.isCallback())
00308                querySet.m_query.setCallback(tmpQuerySet.m_query.getCallback());
00309 
00310          }
00311          queriesLock.unlock();
00312       }
00313 
00314       querySet.m_query.callback();
00315    }
00316 
00317    {
00318       boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
00319       --m_threads;
00320    }
00321    threadsChanged.notify_one();
00322 }
00323 
00324 template<class T> void ASql::ConnectionPar<T>::queue(T* const& statement, QueryPar& query, int instance)
00325 {
00326    if(instance == -1)
00327    {
00328       instance=0;
00329       for(unsigned int i=1; i<threads(); ++i)
00330       {{
00331          boost::lock_guard<boost::mutex> queriesLock(queries[i]);
00332          if(queries[i].size() < queries[instance].size())
00333             instance=i;
00334       }}
00335    }
00336 
00337    boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
00338    queries[instance].push(QuerySet(query, statement, true));
00339    wakeUp[instance].notify_one();
00340 }
00341 
00342 template<class T> const bool ASql::ConnectionPar<T>::s_false = false;
00343 
00344 template<class T> void ASql::ConnectionPar<T>::queue(Transaction<T>& transaction, int instance)
00345 {
00346    if(instance == -1)
00347    {
00348       instance=0;
00349       for(unsigned int i=1; i<threads(); ++i)
00350       {{
00351          boost::lock_guard<boost::mutex> queriesLock(queries[i]);
00352          if(queries[i].size() < queries[instance].size())
00353             instance=i;
00354       }}
00355    }
00356 
00357    boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
00358 
00359    for(typename Transaction<T>::iterator it=transaction.begin(); it!=transaction.end(); ++it)
00360       queries[instance].push(QuerySet(it->m_query, it->m_statement, false));
00361    queries[instance].back().m_commit = true;
00362 
00363    wakeUp[instance].notify_one();
00364 }
00365 
00366 template<class T> void ASql::Transaction<T>::cancel()
00367 {
00368    for(iterator it=begin(); it!=end(); ++it)
00369       it->m_query.cancel();
00370 }
00371 
00372 template<class T> int ASql::ConnectionPar<T>::queriesSize() const
00373 {
00374    int size=0;
00375    for(unsigned int i=1; i<threads(); ++i)
00376    {{
00377       boost::lock_guard<boost::mutex> queriesLock(queries[i]);
00378       size += queries[i].size();
00379    }}
00380    
00381    return size;
00382 }
00383 
00384 #endif