fastcgi++
|
00001 00002 /*************************************************************************** 00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] * 00004 * * 00005 * This file is part of fastcgi++. * 00006 * * 00007 * fastcgi++ is free software: you can redistribute it and/or modify it * 00008 * under the terms of the GNU Lesser General Public License as published * 00009 * by the Free Software Foundation, either version 3 of the License, or (at * 00010 * your option) any later version. * 00011 * * 00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT * 00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * 00014 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * 00015 * License for more details. * 00016 * * 00017 * You should have received a copy of the GNU Lesser General Public License * 00018 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. * 00019 ****************************************************************************/ 00020 00021 00022 #ifndef MANAGER_HPP 00023 #define MANAGER_HPP 00024 00025 #include <map> 00026 #include <string> 00027 #include <queue> 00028 #include <algorithm> 00029 #include <cstring> 00030 00031 #include <boost/bind.hpp> 00032 #include <boost/shared_ptr.hpp> 00033 #include <boost/thread.hpp> 00034 #include <boost/thread/shared_mutex.hpp> 00035 00036 #include <signal.h> 00037 #include <pthread.h> 00038 00039 #include <fastcgi++/protocol.hpp> 00040 #include <fastcgi++/transceiver.hpp> 00041 00043 namespace Fastcgipp 00044 { 00046 00056 class ManagerPar 00057 { 00058 public: 00060 00071 ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_, bool doSetupSignals); 00072 00073 ~ManagerPar() { instance=0; } 00074 00076 00085 void stop(); 00086 00088 00096 void terminate(); 00097 00099 00106 void setupSignals(); 00107 00109 size_t getMessagesSize() const { return messages.size(); } 00110 00111 protected: 00113 Transceiver transceiver; 00114 00116 00120 class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {}; 00122 00125 Tasks tasks; 00126 00128 std::queue<Message> messages; 00129 00131 00139 void localHandler(Protocol::FullId id); 00140 00142 bool asleep; 00144 boost::mutex sleepMutex; 00146 00150 pthread_t threadId; 00151 00153 00156 bool stopBool; 00158 boost::mutex stopMutex; 00160 00163 bool terminateBool; 00165 boost::mutex terminateMutex; 00166 00167 private: 00169 static void signalHandler(int signum); 00171 static ManagerPar* instance; 00172 }; 00173 00175 00185 template<class T> class Manager: public ManagerPar 00186 { 00187 public: 00189 00199 Manager(int fd=0, bool doSetupSignals=true): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2), doSetupSignals) {} 00200 00202 00209 void handler(); 00210 00212 00227 void push(Protocol::FullId id, Message message); 00228 00230 size_t getRequestsSize() const { return requests.size(); } 00231 private: 00233 00237 class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {}; 00239 00243 Requests requests; 00244 }; 00245 } 00246 00247 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message) 00248 { 00249 using namespace std; 00250 using namespace Protocol; 00251 using namespace boost; 00252 00253 if(id.fcgiId) 00254 { 00255 shared_lock<shared_mutex> reqReadLock(requests); 00256 typename Requests::iterator it(requests.find(id)); 00257 if(it!=requests.end()) 00258 { 00259 lock_guard<mutex> mesLock(it->second->messages); 00260 it->second->messages.push(message); 00261 lock_guard<mutex> tasksLock(tasks); 00262 tasks.push(id); 00263 } 00264 else if(!message.type) 00265 { 00266 Header& header=*(Header*)message.data.get(); 00267 if(header.getType()==BEGIN_REQUEST) 00268 { 00269 BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header)); 00270 00271 reqReadLock.unlock(); 00272 unique_lock<shared_mutex> reqWriteLock(requests); 00273 00274 boost::shared_ptr<T>& request = requests[id]; 00275 request.reset(new T); 00276 request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1)); 00277 } 00278 else 00279 return; 00280 } 00281 } 00282 else 00283 { 00284 messages.push(message); 00285 tasks.push(id); 00286 } 00287 00288 lock_guard<mutex> sleepLock(sleepMutex); 00289 if(asleep) 00290 transceiver.wake(); 00291 } 00292 00293 template<class T> void Fastcgipp::Manager<T>::handler() 00294 { 00295 using namespace std; 00296 using namespace boost; 00297 00298 threadId=pthread_self(); 00299 00300 while(1) 00301 {{ 00302 { 00303 lock_guard<mutex> stopLock(stopMutex); 00304 if(stopBool) 00305 { 00306 stopBool=false; 00307 return; 00308 } 00309 } 00310 00311 bool sleep=transceiver.handler(); 00312 00313 { 00314 lock_guard<mutex> terminateLock(terminateMutex); 00315 if(terminateBool) 00316 { 00317 shared_lock<shared_mutex> requestsLock(requests); 00318 if(requests.empty() && sleep) 00319 { 00320 terminateBool=false; 00321 return; 00322 } 00323 } 00324 } 00325 00326 unique_lock<mutex> tasksLock(tasks); 00327 unique_lock<mutex> sleepLock(sleepMutex); 00328 00329 if(tasks.empty()) 00330 { 00331 tasksLock.unlock(); 00332 00333 asleep=true; 00334 sleepLock.unlock(); 00335 00336 if(sleep) transceiver.sleep(); 00337 00338 sleepLock.lock(); 00339 asleep=false; 00340 sleepLock.unlock(); 00341 00342 continue; 00343 } 00344 00345 sleepLock.unlock(); 00346 00347 Protocol::FullId id=tasks.front(); 00348 tasks.pop(); 00349 tasksLock.unlock(); 00350 00351 if(id.fcgiId==0) 00352 localHandler(id); 00353 else 00354 { 00355 shared_lock<shared_mutex> reqReadLock(requests); 00356 typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id)); 00357 if(it!=requests.end() && it->second->handler()) 00358 { 00359 reqReadLock.unlock(); 00360 unique_lock<shared_mutex> reqWriteLock(requests); 00361 00362 requests.erase(it); 00363 } 00364 } 00365 }} 00366 } 00367 00368 #endif