fastcgi++
transceiver.cpp
Go to the documentation of this file.
1 
2 /***************************************************************************
3 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] *
4 * *
5 * This file is part of fastcgi++. *
6 * *
7 * fastcgi++ is free software: you can redistribute it and/or modify it *
8 * under the terms of the GNU Lesser General Public License as published *
9 * by the Free Software Foundation, either version 3 of the License, or (at *
10 * your option) any later version. *
11 * *
12 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or *
14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public *
15 * License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public License *
18 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
19 ****************************************************************************/
20 
21 
23 
25 {
26  while(1)
27  {{
29  if(sendBlock.size)
30  {
31  ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
32  if(sent<0)
33  {
34  if(errno==EPIPE || errno==EBADF)
35  {
36  freeFd(sendBlock.fd);
37  sent=sendBlock.size;
38  }
39  else if(errno!=EAGAIN) throw Exceptions::SocketWrite(sendBlock.fd, errno);
40  }
41 
42  buffer.freeRead(sent);
43  if(sent!=(ssize_t)sendBlock.size)
44  break;
45  }
46  else
47  break;
48  }}
49 
50  return buffer.empty();
51 }
52 
54 {
55  writeIt->end+=size;
56  if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
57  {
58  chunks.push_back(Chunk());
59  --writeIt;
60  }
61  frames.push(Frame(size, kill, id));
62 }
63 
65 {
66  using namespace std;
67  using namespace Protocol;
68 
69  bool transmitEmpty=transmit();
70 
71  int retVal=poll(&pollFds.front(), pollFds.size(), 0);
72  if(retVal==0)
73  {
74  if(transmitEmpty) return true;
75  else return false;
76  }
77  if(retVal<0) throw Exceptions::SocketPoll(errno);
78 
79  std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(), reventsZero);
80 
81  if(pollFd->revents & (POLLHUP|POLLERR|POLLNVAL) )
82  {
83  fdBuffers.erase(pollFd->fd);
84  pollFds.erase(pollFd);
85  return false;
86  }
87 
88  int fd=pollFd->fd;
89  if(fd==socket)
90  {
91  sockaddr_un addr;
92  socklen_t addrlen=sizeof(sockaddr_un);
93  fd=accept(fd, (sockaddr*)&addr, &addrlen);
94  fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
95 
96  pollFds.push_back(pollfd());
97  pollFds.back().fd = fd;
98  pollFds.back().events = POLLIN|POLLHUP|POLLERR|POLLNVAL;
99 
100  Message& messageBuffer=fdBuffers[fd].messageBuffer;
101  messageBuffer.size=0;
102  messageBuffer.type=0;
103  }
104  else if(fd==wakeUpFdIn)
105  {
106  char x;
107  read(wakeUpFdIn, &x, 1);
108  return false;
109  }
110 
111  Message& messageBuffer=fdBuffers[fd].messageBuffer;
112  Header& headerBuffer=fdBuffers[fd].headerBuffer;
113 
114  ssize_t actual;
115  // Are we in the process of recieving some part of a frame?
116  if(!messageBuffer.data)
117  {
118  // Are we recieving a partial header or new?
119  actual=read(fd, (char*)&headerBuffer+messageBuffer.size, sizeof(Header)-messageBuffer.size);
120  if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
121  if(actual>0) messageBuffer.size+=actual;
122 
123  if( actual == 0 )
124  {
125  fdBuffers.erase( pollFd->fd );
126  pollFds.erase( pollFd );
127  return false;
128  }
129 
130  if(messageBuffer.size!=sizeof(Header))
131  {
132  if(transmitEmpty) return true;
133  else return false;
134  }
135 
136  messageBuffer.data.reset(new char[sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
137  memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer), sizeof(Header));
138  }
139 
140  const Header& header=*(const Header*)messageBuffer.data.get();
141  size_t needed=header.getContentLength()+header.getPaddingLength()+sizeof(Header)-messageBuffer.size;
142  actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
143  if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
144  if(actual>0) messageBuffer.size+=actual;
145 
146  // Did we recieve a full frame?
147  if(actual==(ssize_t)needed)
148  {
149  sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
150  messageBuffer.size=0;
151  messageBuffer.data.reset();
152  return false;
153  }
154  if(transmitEmpty) return true;
155  else return false;
156 }
157 
159 {
160  pRead+=size;
161  if(pRead>=chunks.begin()->end)
162  {
163  if(writeIt==chunks.begin())
164  {
165  pRead=writeIt->data.get();
166  writeIt->end=pRead;
167  }
168  else
169  {
170  if(writeIt==--chunks.end())
171  {
172  chunks.begin()->end=chunks.begin()->data.get();
173  chunks.splice(chunks.end(), chunks, chunks.begin());
174  }
175  else
176  chunks.pop_front();
177  pRead=chunks.begin()->data.get();
178  }
179  }
180  if((frames.front().size-=size)==0)
181  {
182  if(frames.front().closeFd)
183  freeFd(frames.front().id.fd);
184  frames.pop();
185  }
186 
187 }
188 
190 {
191  char x=0;
192  write(wakeUpFdOut, &x, 1);
193 }
194 
195 Fastcgipp::Transceiver::Transceiver(int fd_, boost::function<void(Protocol::FullId, Message)> sendMessage_)
196 :buffer(pollFds, fdBuffers), sendMessage(sendMessage_), pollFds(2), socket(fd_)
197 {
198  socket=fd_;
199 
200  // Let's setup a in/out socket for waking up poll()
201  int socPair[2];
202  socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
203  wakeUpFdIn=socPair[0];
204  fcntl(wakeUpFdIn, F_SETFL, (fcntl(wakeUpFdIn, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
205  wakeUpFdOut=socPair[1];
206 
207  fcntl(socket, F_SETFL, (fcntl(socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
208  pollFds[0].events = POLLIN|POLLHUP;
209  pollFds[0].fd = socket;
210  pollFds[1].events = POLLIN|POLLHUP;
211  pollFds[1].fd = wakeUpFdIn;
212 }
213 
215 {
216  switch(errno)
217  {
218  case EAGAIN:
219  msg = "The file descriptor has been marked non-blocking (O_NONBLOCK) and the write would block.";
220  break;
221 
222  case EBADF:
223  msg = "The file descriptor is not a valid file descriptor or is not open for writing.";
224  break;
225 
226  case EFAULT:
227  msg = "The buffer is outside your accessible address space.";
228  break;
229 
230  case EFBIG:
231  msg = "An attempt was made to write a file that exceeds the implementation-defined maximum file size or the process’s file size limit, or to write at a position past the maximum allowed offset.";
232  break;
233 
234  case EINTR:
235  msg = "The call was interrupted by a signal before any data was written; see signal(7).";
236  break;
237 
238  case EINVAL:
239  msg = "The file descriptor is attached to an object which is unsuitable for writing; or the file was opened with the O_DIRECT flag, and either the address specified for the buffer, the value specified in count, or the current file offset is not suitably aligned.";
240  break;
241 
242  case EIO:
243  msg = "A low-level I/O error occurred while modifying the inode.";
244  break;
245 
246  case ENOSPC:
247  msg = "The device containing the file referred to by the file descriptor has no room for the data.";
248  break;
249 
250  case EPIPE:
251  msg = "The file descriptor is connected to a pipe or socket whose reading end is closed. When this happens the writing process will also receive a SIGPIPE signal. (Thus, the write return value is seen only if the program catches, blocks or ignores this signal.)";
252  break;
253  }
254 }
255 
257 {
258  switch(errno)
259  {
260  case EAGAIN:
261  msg = "Non-blocking I/O has been selected using O_NONBLOCK and no data was immediately available for reading.";
262  break;
263 
264  case EBADF:
265  msg = "The file descriptor is not valid or is not open for reading.";
266  break;
267 
268  case EFAULT:
269  msg = "The buffer is outside your accessible address space.";
270  break;
271 
272  case EINTR:
273  msg = "The call was interrupted by a signal before any data was written; see signal(7).";
274  break;
275 
276  case EINVAL:
277  msg = "The file descriptor is attached to an object which is unsuitable for reading; or the file was opened with the O_DIRECT flag, and either the address specified in buf, the value specified in count, or the current file offset is not suitably aligned.";
278  break;
279 
280  case EIO:
281  msg = "I/O error. This will happen for example when the process is in a background process group, tries to read from its controlling tty, and either it is ignoring or blocking SIGTTIN or its process group is orphaned. It may also occur when there is a low-level I/O error while reading from a disk or tape.";
282  break;
283 
284  case EISDIR:
285  msg = "The file descriptor refers to a directory.";
286  break;
287  }
288 }
289 
291 {
292  switch(errno)
293  {
294  case EBADF:
295  msg = "An invalid file descriptor was given in one of the sets.";
296  break;
297 
298  case EFAULT:
299  msg = "The array given as argument was not contained in the calling program’s address space.";
300  break;
301 
302  case EINTR:
303  msg = "A signal occurred before any requested event; see signal(7).";
304  break;
305 
306  case EINVAL:
307  msg = "The nfds value exceeds the RLIMIT_NOFILE value.";
308  break;
309 
310  case ENOMEM:
311  msg = "There was no space to allocate file descriptor tables.";
312  break;
313  }
314 }
315 
316 void Fastcgipp::Transceiver::freeFd(int fd, std::vector<pollfd>& pollFds, std::map<int, fdBuffer>& fdBuffers)
317 {
318  std::vector<pollfd>::iterator it=std::find_if(pollFds.begin(), pollFds.end(), equalsFd(fd));
319  if(it != pollFds.end())
320  {
321  pollFds.erase(it);
322  close(fd);
323  fdBuffers.erase(fd);
324  }
325 }