fastcgi++
asql.hpp
Go to the documentation of this file.
1 
2 /***************************************************************************
3 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] *
4 * *
5 * This file is part of fastcgi++. *
6 * *
7 * fastcgi++ is free software: you can redistribute it and/or modify it *
8 * under the terms of the GNU Lesser General Public License as published *
9 * by the Free Software Foundation, either version 3 of the License, or (at *
10 * your option) any later version. *
11 * *
12 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or *
14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public *
15 * License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public License *
18 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
19 ****************************************************************************/
20 
21 #ifndef ASQL_HPP
22 #define ASQL_HPP
23 
24 #include <vector>
25 #include <queue>
26 #include <cstring>
27 
28 #include <boost/date_time/posix_time/posix_time.hpp>
29 #include <boost/shared_ptr.hpp>
30 #include <boost/shared_array.hpp>
31 #include <boost/scoped_array.hpp>
32 #include <boost/function.hpp>
33 #include <boost/bind.hpp>
34 #include <boost/thread.hpp>
35 
36 #include <asql/query.hpp>
37 #include <asql/data.hpp>
38 
40 namespace ASql
41 {
55  template<class T> class Transaction
56  {
57  public:
61  struct Item
62  {
63  Item(QueryPar query, T* statement): m_query(query), m_statement(statement) {}
67  };
68  private:
69  std::vector<Item> m_items;
70  public:
71  typedef typename std::vector<Item>::iterator iterator;
80  inline void push(QueryPar& query, T& statement) { m_items.push_back(Item(query, &statement)); }
84  inline void clear() { m_items.clear(); }
88  inline iterator begin() { return m_items.begin(); }
92  inline iterator end() { return m_items.end(); }
96  inline bool empty() { return m_items.size()==0; }
97 
101  void cancel();
102 
106  void start(int instance=-1) { m_items.front().m_statement->connection.queue(*this, instance); }
107  };
108 
113  {
114  public:
116  int threads() const { return maxThreads; }
117 
119  bool running() const { return m_threads; }
120  protected:
124  const int maxThreads;
125  boost::mutex threadsMutex;
126  boost::condition_variable threadsChanged;
128 
129  virtual void commit(const unsigned int thread=0)=0;
130  virtual void rollback(const unsigned int thread=0)=0;
131 
132  boost::scoped_array<boost::condition_variable> wakeUp;
133 
134  boost::mutex terminateMutex;
136 
137  Connection(const int maxThreads_): maxThreads(maxThreads_), m_threads(0), wakeUp(new boost::condition_variable[maxThreads_]) {}
138  };
139 
143  template<class T> class ConnectionPar: public Connection
144  {
145  private:
146  struct QuerySet
147  {
148  QuerySet(QueryPar& query, T* const& statement, const bool commit): m_query(query), m_statement(statement), m_commit(commit) {}
150  bool m_commit;
152  };
156  class Queries: public std::queue<QuerySet>, public boost::mutex {};
157  boost::scoped_array<Queries> queries;
158 
162  void intHandler(const unsigned int id);
163 
168  {
169  const bool*& m_canceler;
170  public:
171  SetCanceler(const bool*& canceler, bool& dest): m_canceler(canceler) { canceler=&dest; }
173  };
174 
175  protected:
176  ConnectionPar(const int maxThreads_): Connection(maxThreads_), queries(new Queries[maxThreads_]) {}
177  public:
179  int queriesSize() const;
180 
184  void start();
188  void terminate();
192  void queue(Transaction<T>& transaction, int instance);
193  inline void queue(T* const& statement, QueryPar& query, int instance);
194 
195  static const bool s_false;
196  };
197 
201  class Statement
202  {
203  protected:
204  boost::scoped_array<Data::Conversions> paramsConversions;
205  boost::scoped_array<Data::Conversions> resultsConversions;
206 
207  Statement(unsigned int threads):
208  paramsConversions(new Data::Conversions[threads]),
209  resultsConversions(new Data::Conversions[threads]) {}
210  };
211 }
212 
213 template<class T> void ASql::ConnectionPar<T>::start()
214 {
215  {
216  boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
217  terminateBool=false;
218  }
219 
220  boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
221  while(m_threads<maxThreads)
222  {
223  boost::thread(boost::bind(&ConnectionPar<T>::intHandler, boost::ref(*this), m_threads));
224  threadsChanged.wait(threadsLock);
225  }
226 }
227 
228 template<class T> void ASql::ConnectionPar<T>::terminate()
229 {
230  {
231  boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
232  terminateBool=true;
233  }
234  for(boost::condition_variable* i=wakeUp.get(); i<wakeUp.get()+threads(); ++i)
235  i->notify_all();
236 
237  boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
238  while(m_threads)
239  threadsChanged.wait(threadsLock);
240 }
241 
242 template<class T> void ASql::ConnectionPar<T>::intHandler(const unsigned int id)
243 {
244  {
245  boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
246  ++m_threads;
247  }
248  threadsChanged.notify_one();
249 
250  boost::unique_lock<boost::mutex> terminateLock(terminateMutex, boost::defer_lock_t());
251  boost::unique_lock<boost::mutex> queriesLock(queries[id], boost::defer_lock_t());
252 
253  while(1)
254  {
255  terminateLock.lock();
256  if(terminateBool)
257  break;
258  terminateLock.unlock();
259 
260  queriesLock.lock();
261  if(!queries[id].size())
262  {
263  wakeUp[id].wait(queriesLock);
264  queriesLock.unlock();
265  continue;
266  }
267  QuerySet querySet=queries[id].front();
268  queries[id].pop();
269  queriesLock.unlock();
270 
271  Error error;
272 
273  try
274  {
275  SetCanceler SetCanceler(querySet.m_statement->m_stop[id], querySet.m_query.m_sharedData->m_cancel);
276  if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_PARAMETERS)
277  {
278  if(querySet.m_query.m_sharedData->m_flags & QueryPar::SharedData::FLAG_SINGLE_RESULTS)
279  {
280  if(!querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), *static_cast<Data::Set*>(querySet.m_query.results()), false, id)) querySet.m_query.clearResults();
281  }
282  else
283  querySet.m_statement->execute(static_cast<const Data::Set*>(querySet.m_query.parameters()), static_cast<Data::SetContainer*>(querySet.m_query.results()), querySet.m_query.m_sharedData->m_insertId, querySet.m_query.m_sharedData->m_rows, false, id);
284  }
285  else
286  {
287  querySet.m_statement->execute(*static_cast<const Data::SetContainer*>(querySet.m_query.parameters()), querySet.m_query.m_sharedData->m_rows, false, id);
288  }
289 
290  if(querySet.m_commit)
291  commit(id);
292 
293  querySet.m_query.m_sharedData->m_error=Error();
294  }
295  catch(const Error& e)
296  {
297  querySet.m_query.m_sharedData->m_error=e;
298 
299  rollback(id);
300 
301  queriesLock.lock();
302  QuerySet tmpQuerySet=querySet;
303  while(!querySet.m_commit && queries[id].size())
304  {
305  tmpQuerySet=queries[id].front();
306  queries[id].pop();
307  if(!querySet.m_query.isCallback() && tmpQuerySet.m_query.isCallback())
308  querySet.m_query.setCallback(tmpQuerySet.m_query.getCallback());
309 
310  }
311  queriesLock.unlock();
312  }
313 
314  querySet.m_query.callback();
315  }
316 
317  {
318  boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
319  --m_threads;
320  }
321  threadsChanged.notify_one();
322 }
323 
324 template<class T> void ASql::ConnectionPar<T>::queue(T* const& statement, QueryPar& query, int instance)
325 {
326  if(instance == -1)
327  {
328  instance=0;
329  for(unsigned int i=1; i<threads(); ++i)
330  {{
331  boost::lock_guard<boost::mutex> queriesLock(queries[i]);
332  if(queries[i].size() < queries[instance].size())
333  instance=i;
334  }}
335  }
336 
337  boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
338  queries[instance].push(QuerySet(query, statement, true));
339  wakeUp[instance].notify_one();
340 }
341 
342 template<class T> const bool ASql::ConnectionPar<T>::s_false = false;
343 
344 template<class T> void ASql::ConnectionPar<T>::queue(Transaction<T>& transaction, int instance)
345 {
346  if(instance == -1)
347  {
348  instance=0;
349  for(unsigned int i=1; i<threads(); ++i)
350  {{
351  boost::lock_guard<boost::mutex> queriesLock(queries[i]);
352  if(queries[i].size() < queries[instance].size())
353  instance=i;
354  }}
355  }
356 
357  boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
358 
359  for(typename Transaction<T>::iterator it=transaction.begin(); it!=transaction.end(); ++it)
360  queries[instance].push(QuerySet(it->m_query, it->m_statement, false));
361  queries[instance].back().m_commit = true;
362 
363  wakeUp[instance].notify_one();
364 }
365 
366 template<class T> void ASql::Transaction<T>::cancel()
367 {
368  for(iterator it=begin(); it!=end(); ++it)
369  it->m_query.cancel();
370 }
371 
372 template<class T> int ASql::ConnectionPar<T>::queriesSize() const
373 {
374  int size=0;
375  for(unsigned int i=1; i<threads(); ++i)
376  {{
377  boost::lock_guard<boost::mutex> queriesLock(queries[i]);
378  size += queries[i].size();
379  }}
380 
381  return size;
382 }
383 
384 #endif