00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <fastcgi++/transceiver.hpp>
00023
00024 int Fastcgipp::Transceiver::transmit()
00025 {
00026 while(1)
00027 {{
00028 Buffer::SendBlock sendBlock(buffer.requestRead());
00029 if(sendBlock.size)
00030 {
00031 ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
00032 if(sent<0)
00033 {
00034 if(errno==EPIPE)
00035 {
00036 pollFds.erase(std::find_if(pollFds.begin(), pollFds.end(), equalsFd(sendBlock.fd)));
00037 fdBuffers.erase(sendBlock.fd);
00038 sent=sendBlock.size;
00039 }
00040 else if(errno!=EAGAIN) throw Exceptions::SocketWrite(sendBlock.fd, errno);
00041 }
00042
00043 buffer.freeRead(sent);
00044 if(sent!=sendBlock.size)
00045 break;
00046 }
00047 else
00048 break;
00049 }}
00050
00051 return buffer.empty();
00052 }
00053
00054 void Fastcgipp::Transceiver::Buffer::secureWrite(size_t size, Protocol::FullId id, bool kill)
00055 {
00056 writeIt->end+=size;
00057 if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
00058 {
00059 chunks.push_back(Chunk());
00060 --writeIt;
00061 }
00062 frames.push(Frame(size, kill, id));
00063 }
00064
00065 bool Fastcgipp::Transceiver::handler()
00066 {
00067 using namespace std;
00068 using namespace Protocol;
00069
00070 bool transmitEmpty=transmit();
00071
00072 int retVal=poll(&pollFds.front(), pollFds.size(), 0);
00073 if(retVal==0)
00074 {
00075 if(transmitEmpty) return true;
00076 else return false;
00077 }
00078 if(retVal<0) throw Exceptions::Poll(errno);
00079
00080 std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(), reventsZero);
00081
00082 if(pollFd->revents&POLLHUP)
00083 {
00084 fdBuffers.erase(pollFd->fd);
00085 pollFds.erase(pollFd);
00086 return false;
00087 }
00088
00089 int fd=pollFd->fd;
00090 if(fd==socket)
00091 {
00092 sockaddr_un addr;
00093 socklen_t addrlen=sizeof(sockaddr_un);
00094 fd=accept(fd, (sockaddr*)&addr, &addrlen);
00095 fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00096
00097 pollFds.push_back(pollfd());
00098 pollFds.back().fd = fd;
00099 pollFds.back().events = POLLIN|POLLHUP;
00100
00101 Message& messageBuffer=fdBuffers[fd].messageBuffer;
00102 messageBuffer.size=0;
00103 messageBuffer.type=0;
00104 }
00105 else if(fd==wakeUpFdIn)
00106 {
00107 char x;
00108 read(wakeUpFdIn, &x, 1);
00109 return false;
00110 }
00111
00112 Message& messageBuffer=fdBuffers[fd].messageBuffer;
00113 Header& headerBuffer=fdBuffers[fd].headerBuffer;
00114
00115 ssize_t actual;
00116
00117 if(!messageBuffer.data)
00118 {
00119
00120 actual=read(fd, (char*)&headerBuffer+messageBuffer.size, sizeof(Header)-messageBuffer.size);
00121 if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00122 if(actual>0) messageBuffer.size+=actual;
00123 if(messageBuffer.size!=sizeof(Header))
00124 {
00125 if(transmitEmpty) return true;
00126 else return false;
00127 }
00128
00129 messageBuffer.data.reset(new char[sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
00130 memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer), sizeof(Header));
00131 }
00132
00133 const Header& header=*(const Header*)messageBuffer.data.get();
00134 size_t needed=header.getContentLength()+header.getPaddingLength()+sizeof(Header)-messageBuffer.size;
00135 actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
00136 if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00137 if(actual>0) messageBuffer.size+=actual;
00138
00139
00140 if(actual==needed)
00141 {
00142 sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
00143 messageBuffer.size=0;
00144 messageBuffer.data.reset();
00145 return false;
00146 }
00147 if(transmitEmpty) return true;
00148 else return false;
00149 }
00150
00151 void Fastcgipp::Transceiver::Buffer::freeRead(size_t size)
00152 {
00153 pRead+=size;
00154 if(pRead>=chunks.begin()->end)
00155 {
00156 if(writeIt==chunks.begin())
00157 {
00158 pRead=writeIt->data.get();
00159 writeIt->end=pRead;
00160 }
00161 else
00162 {
00163 if(writeIt==--chunks.end())
00164 {
00165 chunks.begin()->end=chunks.begin()->data.get();
00166 chunks.splice(chunks.end(), chunks, chunks.begin());
00167 }
00168 else
00169 chunks.pop_front();
00170 pRead=chunks.begin()->data.get();
00171 }
00172 }
00173 if((frames.front().size-=size)==0)
00174 {
00175 if(frames.front().closeFd)
00176 {
00177 pollFds.erase(std::find_if(pollFds.begin(), pollFds.end(), equalsFd(frames.front().id.fd)));
00178 close(frames.front().id.fd);
00179 fdBuffers.erase(frames.front().id.fd);
00180 }
00181 frames.pop();
00182 }
00183
00184 }
00185
00186 void Fastcgipp::Transceiver::wake()
00187 {
00188 char x;
00189 write(wakeUpFdOut, &x, 1);
00190 }
00191
00192 Fastcgipp::Transceiver::Transceiver(int fd_, boost::function<void(Protocol::FullId, Message)> sendMessage_)
00193 :sendMessage(sendMessage_), pollFds(2), socket(fd_), buffer(pollFds, fdBuffers)
00194 {
00195 socket=fd_;
00196
00197
00198 int socPair[2];
00199 socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
00200 wakeUpFdIn=socPair[0];
00201 fcntl(wakeUpFdIn, F_SETFL, (fcntl(wakeUpFdIn, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00202 wakeUpFdOut=socPair[1];
00203
00204 fcntl(socket, F_SETFL, (fcntl(socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00205 pollFds[0].events = POLLIN|POLLHUP;
00206 pollFds[0].fd = socket;
00207 pollFds[1].events = POLLIN|POLLHUP;
00208 pollFds[1].fd = wakeUpFdIn;
00209 }