fastcgi++
manager.hpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 
00022 #ifndef MANAGER_HPP
00023 #define MANAGER_HPP
00024 
00025 #include <map>
00026 #include <string>
00027 #include <queue>
00028 #include <algorithm>
00029 #include <cstring>
00030 
00031 #include <boost/bind.hpp>
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/shared_mutex.hpp>
00035 
00036 #include <signal.h>
00037 #include <pthread.h>
00038 
00039 #include <fastcgi++/protocol.hpp>
00040 #include <fastcgi++/transceiver.hpp>
00041 
00043 namespace Fastcgipp
00044 {
00046 
00056    class ManagerPar
00057    {
00058    public:
00060 
00071       ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_, bool doSetupSignals);
00072 
00073       ~ManagerPar() { instance=0; }
00074 
00076 
00085       void stop();
00086 
00088 
00096       void terminate();
00097       
00099 
00106       void setupSignals();
00107 
00109       size_t getMessagesSize() const { return messages.size(); }
00110 
00111    protected:
00113       Transceiver transceiver;
00114 
00116 
00120       class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
00122 
00125       Tasks tasks;
00126 
00128       std::queue<Message> messages;
00129 
00131 
00139       void localHandler(Protocol::FullId id);
00140 
00142       bool asleep;
00144       boost::mutex sleepMutex;
00146 
00150       pthread_t threadId;
00151 
00153 
00156       bool stopBool;
00158       boost::mutex stopMutex;
00160 
00163       bool terminateBool;
00165       boost::mutex terminateMutex;
00166 
00167    private:
00169       static void signalHandler(int signum);
00171       static ManagerPar* instance;
00172    };
00173    
00175 
00185    template<class T> class Manager: public ManagerPar
00186    {
00187    public:
00189 
00199       Manager(int fd=0, bool doSetupSignals=true): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2), doSetupSignals) {}
00200 
00202 
00209       void handler();
00210 
00212 
00227       void push(Protocol::FullId id, Message message);
00228 
00230       size_t getRequestsSize() const { return requests.size(); }
00231    private:
00233 
00237       class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
00239 
00243       Requests requests;
00244    };
00245 }
00246 
00247 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
00248 {
00249    using namespace std;
00250    using namespace Protocol;
00251    using namespace boost;
00252 
00253    if(id.fcgiId)
00254    {
00255       shared_lock<shared_mutex> reqReadLock(requests);
00256       typename Requests::iterator it(requests.find(id));
00257       if(it!=requests.end())
00258       {
00259          lock_guard<mutex> mesLock(it->second->messages);
00260          it->second->messages.push(message);
00261          lock_guard<mutex> tasksLock(tasks);
00262          tasks.push(id);
00263       }
00264       else if(!message.type)
00265       {
00266          Header& header=*(Header*)message.data.get();
00267          if(header.getType()==BEGIN_REQUEST)
00268          {
00269             BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
00270 
00271             reqReadLock.unlock();
00272             unique_lock<shared_mutex> reqWriteLock(requests);
00273 
00274             boost::shared_ptr<T>& request = requests[id];
00275             request.reset(new T);
00276             request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
00277          }
00278          else
00279             return;
00280       }
00281    }
00282    else
00283    {
00284       messages.push(message);
00285       tasks.push(id);
00286    }
00287 
00288    lock_guard<mutex> sleepLock(sleepMutex);
00289    if(asleep)
00290       transceiver.wake();
00291 }
00292 
00293 template<class T> void Fastcgipp::Manager<T>::handler()
00294 {
00295    using namespace std;
00296    using namespace boost;
00297 
00298    threadId=pthread_self();
00299 
00300    while(1)
00301    {{
00302       {
00303          lock_guard<mutex> stopLock(stopMutex);
00304          if(stopBool)
00305          {
00306             stopBool=false;
00307             return;
00308          }
00309       }
00310       
00311       bool sleep=transceiver.handler();
00312 
00313       {
00314          lock_guard<mutex> terminateLock(terminateMutex);
00315          if(terminateBool)
00316          {
00317             shared_lock<shared_mutex> requestsLock(requests);
00318             if(requests.empty() && sleep)
00319             {
00320                terminateBool=false;
00321                return;
00322             }
00323          }
00324       }
00325 
00326       unique_lock<mutex> tasksLock(tasks);
00327       unique_lock<mutex> sleepLock(sleepMutex);
00328 
00329       if(tasks.empty())
00330       {
00331          tasksLock.unlock();
00332 
00333          asleep=true;
00334          sleepLock.unlock();
00335 
00336          if(sleep) transceiver.sleep();
00337 
00338          sleepLock.lock();
00339          asleep=false;
00340          sleepLock.unlock();
00341 
00342          continue;
00343       }
00344 
00345       sleepLock.unlock();
00346 
00347       Protocol::FullId id=tasks.front();
00348       tasks.pop();
00349       tasksLock.unlock();
00350 
00351       if(id.fcgiId==0)
00352          localHandler(id);
00353       else
00354       {
00355          shared_lock<shared_mutex> reqReadLock(requests);
00356          typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
00357          if(it!=requests.end() && it->second->handler())
00358          {
00359             reqReadLock.unlock();
00360             unique_lock<shared_mutex> reqWriteLock(requests);
00361 
00362             requests.erase(it);
00363          }
00364       }
00365    }}
00366 }
00367 
00368 #endif