00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef MANAGER_HPP
00023 #define MANAGER_HPP
00024
00025 #include <map>
00026 #include <string>
00027 #include <queue>
00028 #include <algorithm>
00029 #include <cstring>
00030
00031 #include <boost/bind.hpp>
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/shared_mutex.hpp>
00035
00036 #include <signal.h>
00037 #include <pthread.h>
00038
00039 #include <fastcgi++/exceptions.hpp>
00040 #include <fastcgi++/protocol.hpp>
00041 #include <fastcgi++/transceiver.hpp>
00042
00044 namespace Fastcgipp
00045 {
00047
00057 template<typename T>
00058 class Manager
00059 {
00060 public:
00062
00071 Manager(int fd=0): transceiver(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2)), asleep(false), terminateBool(false), stopBool(false) { setupSignals(); instance=this; }
00072
00073 ~Manager() { instance=0; }
00074
00076
00083 void handler();
00084
00086
00101 void push(Protocol::FullId id, Message message);
00102
00104
00113 void stop();
00114
00115
00117
00124 void setupSignals();
00125 private:
00127 Transceiver transceiver;
00128
00130
00134 class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
00136
00139 Tasks tasks;
00140
00142
00146 class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
00148
00152 Requests requests;
00153
00155 std::queue<Message> messages;
00156
00158
00166 inline void localHandler(Protocol::FullId id);
00167
00169 bool asleep;
00171 boost::mutex sleepMutex;
00173
00177 pthread_t threadId;
00178
00180
00183 bool stopBool;
00185 boost::mutex stopMutex;
00187
00190 bool terminateBool;
00192 boost::mutex terminateMutex;
00193
00195 static void signalHandler(int signum);
00197 static Manager<T>* instance;
00199
00207 inline void terminate();
00208 };
00209 }
00210
00211 template<class T>
00212 Fastcgipp::Manager<T>* Fastcgipp::Manager<T>::instance=0;
00213
00214 template<class T>
00215 void Fastcgipp::Manager<T>::terminate()
00216 {
00217 boost::lock_guard<boost::mutex> lock(terminateMutex);
00218 terminateBool=true;
00219 }
00220
00221 template<class T>
00222 void Fastcgipp::Manager<T>::stop()
00223 {
00224 boost::lock_guard<boost::mutex> lock(stopMutex);
00225 stopBool=true;
00226 }
00227
00228 template<class T>
00229 void Fastcgipp::Manager<T>::signalHandler(int signum)
00230 {
00231 switch(signum)
00232 {
00233 case SIGUSR1:
00234 {
00235 if(instance) instance->terminate();
00236 break;
00237 }
00238 case SIGTERM:
00239 {
00240 if(instance) instance->stop();
00241 break;
00242 }
00243 }
00244 }
00245
00246 template<class T>
00247 void Fastcgipp::Manager<T>::setupSignals()
00248 {
00249 struct sigaction sigAction;
00250 sigAction.sa_handler=Fastcgipp::Manager<T>::signalHandler;
00251
00252 sigaction(SIGPIPE, &sigAction, NULL);
00253 sigaction(SIGUSR1, &sigAction, NULL);
00254 sigaction(SIGTERM, &sigAction, NULL);
00255 }
00256
00257 template<class T>
00258 void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
00259 {
00260 using namespace std;
00261 using namespace Protocol;
00262 using namespace boost;
00263
00264 if(id.fcgiId)
00265 {
00266 upgrade_lock<shared_mutex> reqLock(requests);
00267 typename Requests::iterator it(requests.find(id));
00268 if(it!=requests.end())
00269 {
00270 lock_guard<mutex> mesLock(it->second->messages);
00271 it->second->messages.push(message);
00272 lock_guard<mutex> tasksLock(tasks);
00273 tasks.push(id);
00274 }
00275 else if(!message.type)
00276 {
00277 Header& header=*(Header*)message.data.get();
00278 if(header.getType()==BEGIN_REQUEST)
00279 {
00280 BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
00281 upgrade_to_unique_lock<shared_mutex> lock(reqLock);
00282 boost::shared_ptr<T>& request = requests[id];
00283 request.reset(new T);
00284 request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
00285 }
00286 else
00287 return;
00288 }
00289 }
00290 else
00291 {
00292 messages.push(message);
00293 tasks.push(id);
00294 }
00295
00296 lock_guard<mutex> sleepLock(sleepMutex);
00297 if(asleep)
00298 transceiver.wake();
00299 }
00300
00301 template<class T>
00302 void Fastcgipp::Manager<T>::handler()
00303 {
00304 using namespace std;
00305 using namespace boost;
00306
00307 threadId=pthread_self();
00308
00309 while(1)
00310 {{
00311 {
00312 lock_guard<mutex> stopLock(stopMutex);
00313 if(stopBool)
00314 {
00315 stopBool=false;
00316 return;
00317 }
00318 }
00319
00320 bool sleep=transceiver.handler();
00321
00322 {
00323 lock_guard<mutex> terminateLock(terminateMutex);
00324 if(terminateBool)
00325 {
00326 shared_lock<shared_mutex> requestsLock(requests);
00327 if(requests.empty() && sleep)
00328 {
00329 terminateBool=false;
00330 return;
00331 }
00332 }
00333 }
00334
00335 unique_lock<mutex> tasksLock(tasks);
00336 unique_lock<mutex> sleepLock(sleepMutex);
00337
00338 if(tasks.empty())
00339 {
00340 tasksLock.unlock();
00341
00342 asleep=true;
00343 sleepLock.unlock();
00344
00345 if(sleep) transceiver.sleep();
00346
00347 sleepLock.lock();
00348 asleep=false;
00349 sleepLock.unlock();
00350
00351 continue;
00352 }
00353
00354 sleepLock.unlock();
00355
00356 Protocol::FullId id=tasks.front();
00357 tasks.pop();
00358 tasksLock.unlock();
00359
00360 if(id.fcgiId==0)
00361 localHandler(id);
00362 else
00363 {
00364 upgrade_lock<shared_mutex> reqReadLock(requests);
00365 typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
00366 if(it!=requests.end() && it->second->handler())
00367 {
00368 upgrade_to_unique_lock<shared_mutex> reqWriteLock(reqReadLock);
00369 requests.erase(it);
00370 }
00371 }
00372 }}
00373 }
00374
00375 template<class T>
00376 void Fastcgipp::Manager<T>::localHandler(Protocol::FullId id)
00377 {
00378 using namespace std;
00379 using namespace Protocol;
00380 Message message(messages.front());
00381 messages.pop();
00382
00383 if(!message.type)
00384 {
00385 const Header& header=*(Header*)message.data.get();
00386 switch(header.getType())
00387 {
00388 case GET_VALUES:
00389 {
00390 size_t nameSize;
00391 size_t valueSize;
00392 const char* name;
00393 const char* value;
00394 processParamHeader(message.data.get()+sizeof(Header), header.getContentLength(), name, nameSize, value, valueSize);
00395 if(nameSize==14 && !memcmp(name, "FCGI_MAX_CONNS", 14))
00396 {
00397 Block buffer(transceiver.requestWrite(sizeof(maxConnsReply)));
00398 memcpy(buffer.data, (const char*)&maxConnsReply, sizeof(maxConnsReply));
00399 transceiver.secureWrite(sizeof(maxConnsReply), id, false);
00400 }
00401 else if(nameSize==13 && !memcmp(name, "FCGI_MAX_REQS", 13))
00402 {
00403 Block buffer(transceiver.requestWrite(sizeof(maxReqsReply)));
00404 memcpy(buffer.data, (const char*)&maxReqsReply, sizeof(maxReqsReply));
00405 transceiver.secureWrite(sizeof(maxReqsReply), id, false);
00406 }
00407 else if(nameSize==15 && !memcmp(name, "FCGI_MPXS_CONNS", 15))
00408 {
00409 Block buffer(transceiver.requestWrite(sizeof(mpxsConnsReply)));
00410 memcpy(buffer.data, (const char*)&mpxsConnsReply, sizeof(mpxsConnsReply));
00411 transceiver.secureWrite(sizeof(mpxsConnsReply), id, false);
00412 }
00413
00414 break;
00415 }
00416
00417 default:
00418 {
00419 Block buffer(transceiver.requestWrite(sizeof(Header)+sizeof(UnknownType)));
00420
00421 Header& sendHeader=*(Header*)buffer.data;
00422 sendHeader.setVersion(version);
00423 sendHeader.setType(UNKNOWN_TYPE);
00424 sendHeader.setRequestId(0);
00425 sendHeader.setContentLength(sizeof(UnknownType));
00426 sendHeader.setPaddingLength(0);
00427
00428 UnknownType& sendBody=*(UnknownType*)(buffer.data+sizeof(Header));
00429 sendBody.setType(header.getType());
00430
00431 transceiver.secureWrite(sizeof(Header)+sizeof(UnknownType), id, false);
00432
00433 break;
00434 }
00435 }
00436 }
00437 }
00438
00439 #endif