Fawkes API  Fawkes Development Version
server.cpp
1 
2 /***************************************************************************
3  * server.cpp - Protobuf stream protocol - server
4  *
5  * Created: Thu Jan 31 14:57:16 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <protobuf_comm/server.h>
38 
39 #include <cstdlib>
40 
41 using namespace boost::asio;
42 using namespace boost::system;
43 
44 namespace protobuf_comm {
45 
46 /** @class ProtobufStreamServer::Session <protobuf_comm/server.h>
47  * Internal class representing a client session.
48  * This class represents a connection to a particular client. It handles
49  * connection management, reading from, and writing to the client.
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor.
54  * @param id ID of the client, used to address messages from within your
55  * application.
56  * @param parent Parent stream server notified about events.
57  * @param io_service ASIO I/O service to use for communication
58  */
59 ProtobufStreamServer::Session::Session(ClientID id,
60  ProtobufStreamServer * parent,
61  boost::asio::io_service &io_service)
62 : id_(id), parent_(parent), socket_(io_service)
63 {
64  in_data_size_ = 1024;
65  in_data_ = malloc(in_data_size_);
66  outbound_active_ = false;
67 }
68 
69 /** Destructor. */
70 ProtobufStreamServer::Session::~Session()
71 {
72  boost::system::error_code err;
73  if (socket_.is_open()) {
74  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
75  socket_.close();
76  }
77  free(in_data_);
78 }
79 
80 /** Do processing required to start a session.
81  */
82 void
83 ProtobufStreamServer::Session::start_session()
84 {
85  remote_endpoint_ = socket_.remote_endpoint();
86 }
87 
88 /** Start reading a message on this session.
89  * This sets up a read handler to read incoming messages. It also notifies
90  * the parent server of the initiated connection.
91  */
92 void
93 ProtobufStreamServer::Session::start_read()
94 {
95  boost::asio::async_read(socket_,
96  boost::asio::buffer(&in_frame_header_, sizeof(frame_header_t)),
97  boost::bind(&ProtobufStreamServer::Session::handle_read_header,
98  shared_from_this(),
99  boost::asio::placeholders::error));
100 }
101 
102 /** Send a message.
103  * @param component_id ID of the component to address
104  * @param msg_type numeric message type
105  * @param m Message to send
106  */
107 void
108 ProtobufStreamServer::Session::send(uint16_t component_id,
109  uint16_t msg_type,
110  google::protobuf::Message &m)
111 {
112  QueueEntry *entry = new QueueEntry();
113  parent_->message_register().serialize(component_id,
114  msg_type,
115  m,
116  entry->frame_header,
117  entry->message_header,
118  entry->serialized_message);
119 
120  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
121  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
122  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
123 
124  std::lock_guard<std::mutex> lock(outbound_mutex_);
125  if (outbound_active_) {
126  outbound_queue_.push(entry);
127  } else {
128  outbound_active_ = true;
129  boost::asio::async_write(socket_,
130  entry->buffers,
131  boost::bind(&ProtobufStreamServer::Session::handle_write,
132  shared_from_this(),
133  boost::asio::placeholders::error,
134  boost::asio::placeholders::bytes_transferred,
135  entry));
136  }
137 }
138 
139 /** Disconnect from client. */
140 void
141 ProtobufStreamServer::Session::disconnect()
142 {
143  boost::system::error_code err;
144  if (socket_.is_open()) {
145  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
146  socket_.close();
147  }
148 }
149 
150 /** Write completion handler. */
151 void
152 ProtobufStreamServer::Session::handle_write(const boost::system::error_code &error,
153  size_t /*bytes_transferred*/,
154  QueueEntry *entry)
155 {
156  delete entry;
157 
158  if (!error) {
159  std::lock_guard<std::mutex> lock(outbound_mutex_);
160  if (!outbound_queue_.empty()) {
161  QueueEntry *front_entry = outbound_queue_.front();
162  outbound_queue_.pop();
163  boost::asio::async_write(socket_,
164  front_entry->buffers,
165  boost::bind(&ProtobufStreamServer::Session::handle_write,
166  shared_from_this(),
167  boost::asio::placeholders::error,
168  boost::asio::placeholders::bytes_transferred,
169  front_entry));
170  } else {
171  outbound_active_ = false;
172  }
173  } else {
174  parent_->disconnected(shared_from_this(), error);
175  }
176 }
177 
178 /** Incoming data handler for header.
179  * This method is called if an error occurs while waiting for data (e.g. if
180  * the remote peer closes the connection), or if new data is available. This
181  * callback expectes header information to be received.
182  * @param error error code
183  */
184 void
185 ProtobufStreamServer::Session::handle_read_header(const boost::system::error_code &error)
186 {
187  if (!error) {
188  size_t to_read = ntohl(in_frame_header_.payload_size);
189  if (to_read > in_data_size_) {
190  void *new_data = realloc(in_data_, to_read);
191  if (new_data) {
192  in_data_size_ = to_read;
193  in_data_ = new_data;
194  } else {
195  parent_->disconnected(shared_from_this(), errc::make_error_code(errc::not_enough_memory));
196  }
197  }
198  // setup new read
199  boost::asio::async_read(socket_,
200  boost::asio::buffer(in_data_, to_read),
201  boost::bind(&ProtobufStreamServer::Session::handle_read_message,
202  shared_from_this(),
203  boost::asio::placeholders::error));
204  } else {
205  parent_->disconnected(shared_from_this(), error);
206  }
207 }
208 
209 /** Incoming data handler for message content.
210  * This method is called if an error occurs while waiting for data (e.g. if
211  * the remote peer closes the connection), or if new data is available. This
212  * callback expectes message to be received that conforms to a previously
213  * received header.
214  * @param error error code
215  */
216 void
217 ProtobufStreamServer::Session::handle_read_message(const boost::system::error_code &error)
218 {
219  if (!error) {
220  message_header_t *message_header = static_cast<message_header_t *>(in_data_);
221 
222  uint16_t comp_id = ntohs(message_header->component_id);
223  uint16_t msg_type = ntohs(message_header->msg_type);
224  try {
225  std::shared_ptr<google::protobuf::Message> m =
226  parent_->message_register().deserialize(in_frame_header_,
227  *message_header,
228  (char *)in_data_ + sizeof(message_header_t));
229  parent_->sig_rcvd_(id_, comp_id, msg_type, m);
230  } catch (std::runtime_error &e) {
231  // ignored, most likely unknown message tpye
232  parent_->sig_recv_failed_(id_, comp_id, msg_type, e.what());
233  }
234  start_read();
235  } else {
236  parent_->disconnected(shared_from_this(), error);
237  }
238 }
239 
240 /** @class ProtobufStreamServer <protobuf_comm/server.h>
241  * Stream server for protobuf message transmission.
242  * The server opens a TCP socket (IPv4) and waits for incoming connections.
243  * Each incoming connection is given a unique client ID. Signals are
244  * provided that can be used to react to connections and incoming data.
245  * @author Tim Niemueller
246  */
247 
248 /** Constructor.
249  * @param port port to listen on
250  */
251 ProtobufStreamServer::ProtobufStreamServer(unsigned short port)
252 : io_service_(), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
253 {
254  message_register_ = new MessageRegister();
255  own_message_register_ = true;
256  next_cid_ = 1;
257 
258  acceptor_.set_option(socket_base::reuse_address(true));
259 
260  start_accept();
261  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
262 }
263 
264 /** Constructor.
265  * @param port port to listen on
266  * @param proto_path file paths to search for proto files. All message types
267  * within these files will automatically be registered and available for dynamic
268  * message creation.
269  */
271  std::vector<std::string> &proto_path)
272 : io_service_(), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
273 {
274  message_register_ = new MessageRegister(proto_path);
275  own_message_register_ = true;
276  next_cid_ = 1;
277 
278  acceptor_.set_option(socket_base::reuse_address(true));
279 
280  start_accept();
281  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
282 }
283 
284 /** Constructor.
285  * @param port port to listen on
286  * @param mr message register to use to (de)serialize messages
287  */
289 : io_service_(),
290  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port)),
291  message_register_(mr),
292  own_message_register_(false)
293 {
294  next_cid_ = 1;
295 
296  acceptor_.set_option(socket_base::reuse_address(true));
297 
298  start_accept();
299  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
300 }
301 
302 /** Destructor. */
304 {
305  io_service_.stop();
306  asio_thread_.join();
307  if (own_message_register_) {
308  delete message_register_;
309  }
310 }
311 
312 /** Send a message to the given client.
313  * @param client ID of the client to addresss
314  * @param component_id ID of the component to address
315  * @param msg_type numeric message type
316  * @param m message to send
317  */
318 void
320  uint16_t component_id,
321  uint16_t msg_type,
322  google::protobuf::Message &m)
323 {
324  if (sessions_.find(client) == sessions_.end()) {
325  throw std::runtime_error("Client does not exist");
326  }
327 
328  sessions_[client]->send(component_id, msg_type, m);
329 }
330 
331 /** Send a message.
332  * @param client ID of the client to addresss
333  * @param component_id ID of the component to address
334  * @param msg_type numeric message type
335  * @param m Message to send
336  */
337 void
339  uint16_t component_id,
340  uint16_t msg_type,
341  std::shared_ptr<google::protobuf::Message> m)
342 {
343  send(client, component_id, msg_type, *m);
344 }
345 
346 /** Send a message.
347  * @param client ID of the client to addresss
348  * @param m Message to send, the message must have an CompType enum type to
349  * specify component ID and message type.
350  */
351 void
352 ProtobufStreamServer::send(ClientID client, google::protobuf::Message &m)
353 {
354  const google::protobuf::Descriptor * desc = m.GetDescriptor();
355  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
356  if (!enumdesc) {
357  throw std::logic_error("Message does not have CompType enum");
358  }
359  const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName("COMP_ID");
360  const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName("MSG_TYPE");
361  if (!compdesc || !msgtdesc) {
362  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
363  }
364  int comp_id = compdesc->number();
365  int msg_type = msgtdesc->number();
366  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
367  throw std::logic_error("Message has invalid COMP_ID");
368  }
369  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
370  throw std::logic_error("Message has invalid MSG_TYPE");
371  }
372 
373  send(client, comp_id, msg_type, m);
374 }
375 
376 /** Send a message.
377  * @param client ID of the client to addresss
378  * @param m Message to send, the message must have an CompType enum type to
379  * specify component ID and message type.
380  */
381 void
382 ProtobufStreamServer::send(ClientID client, std::shared_ptr<google::protobuf::Message> m)
383 {
384  send(client, *m);
385 }
386 
387 /** Send a message to all clients.
388  * @param component_id ID of the component to address
389  * @param msg_type numeric message type
390  * @param m message to send
391  */
392 void
393 ProtobufStreamServer::send_to_all(uint16_t component_id,
394  uint16_t msg_type,
395  google::protobuf::Message &m)
396 {
397  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
398  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
399  send(s->first, component_id, msg_type, m);
400  }
401 }
402 
403 /** Send a message to all clients.
404  * @param component_id ID of the component to address
405  * @param msg_type numeric message type
406  * @param m message to send
407  */
408 void
409 ProtobufStreamServer::send_to_all(uint16_t component_id,
410  uint16_t msg_type,
411  std::shared_ptr<google::protobuf::Message> m)
412 {
413  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
414  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
415  send(s->first, component_id, msg_type, m);
416  }
417 }
418 
419 /** Send a message to all clients.
420  * @param m message to send
421  */
422 void
423 ProtobufStreamServer::send_to_all(std::shared_ptr<google::protobuf::Message> m)
424 {
425  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
426  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
427  send(s->first, m);
428  }
429 }
430 
431 /** Send a message to all clients.
432  * @param m message to send
433  */
434 void
435 ProtobufStreamServer::send_to_all(google::protobuf::Message &m)
436 {
437  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
438  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
439  send(s->first, m);
440  }
441 }
442 
443 /** Disconnect specific client.
444  * @param client client ID to disconnect from
445  */
446 void
448 {
449  if (sessions_.find(client) != sessions_.end()) {
450  boost::shared_ptr<Session> session = sessions_[client];
451  session->disconnect();
452  }
453 }
454 
455 /** Start accepting connections. */
456 void
457 ProtobufStreamServer::start_accept()
458 {
459 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 7))
460  std::lock_guard<std::mutex> lock(next_cid_mutex_);
461 #endif
462  Session::Ptr new_session(new Session(next_cid_++, this, io_service_));
463  acceptor_.async_accept(new_session->socket(),
464  boost::bind(&ProtobufStreamServer::handle_accept,
465  this,
466  new_session,
467  boost::asio::placeholders::error));
468 }
469 
470 void
471 ProtobufStreamServer::disconnected(boost::shared_ptr<Session> session,
472  const boost::system::error_code &error)
473 {
474  sessions_.erase(session->id());
475  sig_disconnected_(session->id(), error);
476 }
477 
478 void
479 ProtobufStreamServer::handle_accept(Session::Ptr new_session,
480  const boost::system::error_code &error)
481 {
482  if (!error) {
483  new_session->start_session();
484  sessions_[new_session->id()] = new_session;
485  sig_connected_(new_session->id(), new_session->remote_endpoint());
486  new_session->start_read();
487  }
488 
489  start_accept();
490 }
491 
492 void
493 ProtobufStreamServer::run_asio()
494 {
495 #if BOOST_ASIO_VERSION > 100409
496  while (!io_service_.stopped()) {
497 #endif
498  usleep(0);
499  io_service_.reset();
500  io_service_.run();
501 #if BOOST_ASIO_VERSION > 100409
502  }
503 #endif
504 }
505 
506 } // end namespace protobuf_comm
Register to map msg type numbers to Protobuf messages.
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:65
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:447
ProtobufStreamServer(unsigned short port)
Constructor.
Definition: server.cpp:251
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:319
void send_to_all(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to all clients.
Definition: server.cpp:393