fastcgi++
transceiver.cpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 
00022 #include <fastcgi++/transceiver.hpp>
00023 
00024 int Fastcgipp::Transceiver::transmit()
00025 {
00026    while(1)
00027    {{
00028       Buffer::SendBlock sendBlock(buffer.requestRead());
00029       if(sendBlock.size)
00030       {
00031          ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
00032          if(sent<0)
00033          {
00034             if(errno==EPIPE || errno==EBADF)
00035             {
00036                freeFd(sendBlock.fd);
00037                sent=sendBlock.size;
00038             }
00039             else if(errno!=EAGAIN) throw Exceptions::SocketWrite(sendBlock.fd, errno);
00040          }
00041 
00042          buffer.freeRead(sent);
00043          if(sent!=(ssize_t)sendBlock.size)
00044             break;
00045       }
00046       else
00047          break;
00048    }}
00049 
00050    return buffer.empty();
00051 }
00052 
00053 void Fastcgipp::Transceiver::Buffer::secureWrite(size_t size, Protocol::FullId id, bool kill)
00054 {
00055    writeIt->end+=size;
00056    if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
00057    {
00058       chunks.push_back(Chunk());
00059       --writeIt;
00060    }
00061    frames.push(Frame(size, kill, id));
00062 }
00063 
00064 bool Fastcgipp::Transceiver::handler()
00065 {
00066    using namespace std;
00067    using namespace Protocol;
00068 
00069    bool transmitEmpty=transmit();
00070 
00071    int retVal=poll(&pollFds.front(), pollFds.size(), 0);
00072    if(retVal==0)
00073    {
00074       if(transmitEmpty) return true;
00075       else return false;
00076    }
00077    if(retVal<0) throw Exceptions::SocketPoll(errno);
00078    
00079    std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(), reventsZero);
00080 
00081    if(pollFd->revents & (POLLHUP|POLLERR|POLLNVAL) )
00082    {
00083       fdBuffers.erase(pollFd->fd);
00084       pollFds.erase(pollFd);
00085       return false;
00086    }
00087    
00088    int fd=pollFd->fd;
00089    if(fd==socket)
00090    {
00091       sockaddr_un addr;
00092       socklen_t addrlen=sizeof(sockaddr_un);
00093       fd=accept(fd, (sockaddr*)&addr, &addrlen);
00094       fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00095       
00096       pollFds.push_back(pollfd());
00097       pollFds.back().fd = fd;
00098       pollFds.back().events = POLLIN|POLLHUP|POLLERR|POLLNVAL;
00099 
00100       Message& messageBuffer=fdBuffers[fd].messageBuffer;
00101       messageBuffer.size=0;
00102       messageBuffer.type=0;
00103    }
00104    else if(fd==wakeUpFdIn)
00105    {
00106       char x;
00107       read(wakeUpFdIn, &x, 1);
00108       return false;
00109    }
00110    
00111    Message& messageBuffer=fdBuffers[fd].messageBuffer;
00112    Header& headerBuffer=fdBuffers[fd].headerBuffer;
00113 
00114    ssize_t actual;
00115    // Are we in the process of recieving some part of a frame?
00116    if(!messageBuffer.data)
00117    {
00118       // Are we recieving a partial header or new?
00119       actual=read(fd, (char*)&headerBuffer+messageBuffer.size, sizeof(Header)-messageBuffer.size);
00120       if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00121       if(actual>0) messageBuffer.size+=actual;
00122       
00123       if( actual == 0 )
00124       {
00125          fdBuffers.erase( pollFd->fd );
00126          pollFds.erase( pollFd );
00127          return false;
00128       }
00129 
00130       if(messageBuffer.size!=sizeof(Header))
00131       {
00132          if(transmitEmpty) return true;
00133          else return false;
00134       }
00135 
00136       messageBuffer.data.reset(new char[sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
00137       memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer), sizeof(Header));
00138    }
00139 
00140    const Header& header=*(const Header*)messageBuffer.data.get();
00141    size_t needed=header.getContentLength()+header.getPaddingLength()+sizeof(Header)-messageBuffer.size;
00142    actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
00143    if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00144    if(actual>0) messageBuffer.size+=actual;
00145 
00146    // Did we recieve a full frame?
00147    if(actual==(ssize_t)needed)
00148    {     
00149       sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
00150       messageBuffer.size=0;
00151       messageBuffer.data.reset();
00152       return false;
00153    }
00154    if(transmitEmpty) return true;
00155    else return false;
00156 }
00157 
00158 void Fastcgipp::Transceiver::Buffer::freeRead(size_t size)
00159 {
00160    pRead+=size;
00161    if(pRead>=chunks.begin()->end)
00162    {
00163       if(writeIt==chunks.begin())
00164       {
00165          pRead=writeIt->data.get();
00166          writeIt->end=pRead;
00167       }
00168       else
00169       {
00170          if(writeIt==--chunks.end())
00171          {
00172             chunks.begin()->end=chunks.begin()->data.get();
00173             chunks.splice(chunks.end(), chunks, chunks.begin());
00174          }
00175          else
00176             chunks.pop_front();
00177          pRead=chunks.begin()->data.get();
00178       }
00179    }
00180    if((frames.front().size-=size)==0)
00181    {
00182       if(frames.front().closeFd)
00183          freeFd(frames.front().id.fd);
00184       frames.pop();
00185    }
00186 
00187 }
00188 
00189 void Fastcgipp::Transceiver::wake()
00190 {
00191    char x=0;
00192    write(wakeUpFdOut, &x, 1);
00193 }
00194 
00195 Fastcgipp::Transceiver::Transceiver(int fd_, boost::function<void(Protocol::FullId, Message)> sendMessage_)
00196 :buffer(pollFds, fdBuffers), sendMessage(sendMessage_), pollFds(2), socket(fd_)
00197 {
00198    socket=fd_;
00199    
00200    // Let's setup a in/out socket for waking up poll()
00201    int socPair[2];
00202    socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
00203    wakeUpFdIn=socPair[0];
00204    fcntl(wakeUpFdIn, F_SETFL, (fcntl(wakeUpFdIn, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);  
00205    wakeUpFdOut=socPair[1]; 
00206    
00207    fcntl(socket, F_SETFL, (fcntl(socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00208    pollFds[0].events = POLLIN|POLLHUP;
00209    pollFds[0].fd = socket;
00210    pollFds[1].events = POLLIN|POLLHUP;
00211    pollFds[1].fd = wakeUpFdIn;
00212 }
00213 
00214 Fastcgipp::Exceptions::SocketWrite::SocketWrite(int fd_, int erno_): Socket(fd_, erno_)
00215 {
00216    switch(errno)
00217    {
00218       case EAGAIN:
00219          msg = "The file descriptor has been marked non-blocking (O_NONBLOCK) and the write would block.";
00220          break;
00221 
00222       case EBADF:
00223          msg = "The file descriptor is not a valid file descriptor or is not open for writing.";
00224          break;
00225 
00226       case EFAULT:
00227          msg = "The buffer is outside your accessible address space.";
00228          break;
00229 
00230       case EFBIG:
00231          msg = "An attempt was made to write a file that exceeds the implementation-defined maximum file size or the process’s file size limit, or to write at a position past the maximum allowed offset.";
00232          break;
00233 
00234       case EINTR:
00235          msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00236          break;
00237 
00238       case EINVAL:
00239          msg = "The file descriptor is attached to an object which is unsuitable for writing; or the file was opened with the O_DIRECT flag, and either the address specified for the buffer, the value specified in count, or the current file offset is not suitably aligned.";
00240          break;
00241 
00242       case EIO:
00243          msg = "A low-level I/O error occurred while modifying the inode.";
00244          break;
00245 
00246       case ENOSPC:
00247          msg = "The device containing the file referred to by the file descriptor has no room for the data.";
00248          break;
00249 
00250       case EPIPE:
00251          msg = "The file descriptor is connected to a pipe or socket whose reading end is closed.  When this happens the writing process will also receive a SIGPIPE signal.  (Thus, the write return value is seen only if the program catches, blocks or ignores this signal.)";
00252          break;
00253    }
00254 }
00255 
00256 Fastcgipp::Exceptions::SocketRead::SocketRead(int fd_, int erno_): Socket(fd_, erno_)
00257 {
00258    switch(errno)
00259    {
00260       case EAGAIN:
00261          msg = "Non-blocking I/O has been selected using O_NONBLOCK and no data was immediately available for reading.";
00262          break;
00263 
00264       case EBADF:
00265          msg = "The file descriptor is not valid or is not open for reading.";
00266          break;
00267 
00268       case EFAULT:
00269          msg = "The buffer is outside your accessible address space.";
00270          break;
00271 
00272       case EINTR:
00273          msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00274          break;
00275 
00276       case EINVAL:
00277          msg = "The file descriptor is attached to an object which is unsuitable for reading; or the file was opened with the O_DIRECT flag, and either the address specified in buf, the value specified in count, or the current file offset is not suitably aligned.";
00278          break;
00279 
00280       case EIO:
00281          msg = "I/O error.  This will happen for example when the process is in a background process group, tries to read from its controlling tty, and either it is ignoring or blocking SIGTTIN or its process group is orphaned.  It may also occur when there is a low-level I/O error while reading from a disk or tape.";
00282          break;
00283 
00284       case EISDIR:
00285          msg = "The file descriptor refers to a directory.";
00286          break;
00287    }
00288 }
00289 
00290 Fastcgipp::Exceptions::SocketPoll::SocketPoll(int erno_): CodedException(0, erno_)
00291 {
00292    switch(errno)
00293    {
00294       case EBADF:
00295          msg = "An invalid file descriptor was given in one of the sets.";
00296          break;
00297 
00298       case EFAULT:
00299          msg = "The array given as argument was not contained in the calling program’s address space.";
00300          break;
00301 
00302       case EINTR:
00303          msg = "A signal occurred before any requested event; see signal(7).";
00304          break;
00305 
00306       case EINVAL:
00307          msg = "The nfds value exceeds the RLIMIT_NOFILE value.";
00308          break;
00309 
00310       case ENOMEM:
00311          msg = "There was no space to allocate file descriptor tables.";
00312          break;
00313    }
00314 }
00315 
00316 void Fastcgipp::Transceiver::freeFd(int fd, std::vector<pollfd>& pollFds, std::map<int, fdBuffer>& fdBuffers)
00317 {
00318    std::vector<pollfd>::iterator it=std::find_if(pollFds.begin(), pollFds.end(), equalsFd(fd));
00319    if(it != pollFds.end())
00320    {
00321       pollFds.erase(it);
00322       close(fd);
00323       fdBuffers.erase(fd);
00324    }
00325 }