fastcgi++
manager.hpp
Go to the documentation of this file.
1 
2 /***************************************************************************
3 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] *
4 * *
5 * This file is part of fastcgi++. *
6 * *
7 * fastcgi++ is free software: you can redistribute it and/or modify it *
8 * under the terms of the GNU Lesser General Public License as published *
9 * by the Free Software Foundation, either version 3 of the License, or (at *
10 * your option) any later version. *
11 * *
12 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or *
14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public *
15 * License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public License *
18 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
19 ****************************************************************************/
20 
21 
22 #ifndef MANAGER_HPP
23 #define MANAGER_HPP
24 
25 #include <map>
26 #include <string>
27 #include <queue>
28 #include <algorithm>
29 #include <cstring>
30 
31 #include <boost/bind.hpp>
32 #include <boost/shared_ptr.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/thread/shared_mutex.hpp>
35 
36 #include <signal.h>
37 #include <pthread.h>
38 
39 #include <fastcgi++/protocol.hpp>
41 
43 namespace Fastcgipp
44 {
46 
56  class ManagerPar
57  {
58  public:
60 
71  ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_, bool doSetupSignals);
72 
74 
76 
85  void stop();
86 
88 
96  void terminate();
97 
99 
106  void setupSignals();
107 
109  size_t getMessagesSize() const { return messages.size(); }
110 
111  protected:
114 
116 
120  class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
122 
126 
128  std::queue<Message> messages;
129 
131 
140 
142  bool asleep;
144  boost::mutex sleepMutex;
146 
150  pthread_t threadId;
151 
153 
156  bool stopBool;
158  boost::mutex stopMutex;
160 
165  boost::mutex terminateMutex;
166 
167  private:
169  static void signalHandler(int signum);
172  };
173 
175 
185  template<class T> class Manager: public ManagerPar
186  {
187  public:
189 
199  Manager(int fd=0, bool doSetupSignals=true): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2), doSetupSignals) {}
200 
202 
209  void handler();
210 
212 
227  void push(Protocol::FullId id, Message message);
228 
230  size_t getRequestsSize() const { return requests.size(); }
231  private:
233 
237  class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
239 
244  };
245 }
246 
247 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
248 {
249  using namespace std;
250  using namespace Protocol;
251  using namespace boost;
252 
253  if(id.fcgiId)
254  {
255  shared_lock<shared_mutex> reqReadLock(requests);
256  typename Requests::iterator it(requests.find(id));
257  if(it!=requests.end())
258  {
259  lock_guard<mutex> mesLock(it->second->messages);
260  it->second->messages.push(message);
261  lock_guard<mutex> tasksLock(tasks);
262  tasks.push(id);
263  }
264  else if(!message.type)
265  {
266  Header& header=*(Header*)message.data.get();
267  if(header.getType()==BEGIN_REQUEST)
268  {
269  BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
270 
271  reqReadLock.unlock();
272  unique_lock<shared_mutex> reqWriteLock(requests);
273 
274  boost::shared_ptr<T>& request = requests[id];
275  request.reset(new T);
276  request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
277  }
278  else
279  return;
280  }
281  }
282  else
283  {
284  messages.push(message);
285  tasks.push(id);
286  }
287 
288  lock_guard<mutex> sleepLock(sleepMutex);
289  if(asleep)
290  transceiver.wake();
291 }
292 
293 template<class T> void Fastcgipp::Manager<T>::handler()
294 {
295  using namespace std;
296  using namespace boost;
297 
298  threadId=pthread_self();
299 
300  while(1)
301  {{
302  {
303  lock_guard<mutex> stopLock(stopMutex);
304  if(stopBool)
305  {
306  stopBool=false;
307  return;
308  }
309  }
310 
311  bool sleep=transceiver.handler();
312 
313  {
314  lock_guard<mutex> terminateLock(terminateMutex);
315  if(terminateBool)
316  {
317  shared_lock<shared_mutex> requestsLock(requests);
318  if(requests.empty() && sleep)
319  {
320  terminateBool=false;
321  return;
322  }
323  }
324  }
325 
326  unique_lock<mutex> tasksLock(tasks);
327  unique_lock<mutex> sleepLock(sleepMutex);
328 
329  if(tasks.empty())
330  {
331  tasksLock.unlock();
332 
333  asleep=true;
334  sleepLock.unlock();
335 
336  if(sleep) transceiver.sleep();
337 
338  sleepLock.lock();
339  asleep=false;
340  sleepLock.unlock();
341 
342  continue;
343  }
344 
345  sleepLock.unlock();
346 
347  Protocol::FullId id=tasks.front();
348  tasks.pop();
349  tasksLock.unlock();
350 
351  if(id.fcgiId==0)
352  localHandler(id);
353  else
354  {
355  shared_lock<shared_mutex> reqReadLock(requests);
356  typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
357  if(it!=requests.end() && it->second->handler())
358  {
359  reqReadLock.unlock();
360  unique_lock<shared_mutex> reqWriteLock(requests);
361 
362  requests.erase(it);
363  }
364  }
365  }}
366 }
367 
368 #endif