Fawkes API  Fawkes Development Version
openprs_mp_proxy.cpp
1 
2 /***************************************************************************
3  * openprs_mp_proxy.h - OpenPRS message passer proxy
4  *
5  * Created: Tue Aug 19 16:59:27 2014
6  * Copyright 2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include "openprs_mp_proxy.h"
24 
25 #include <core/exception.h>
26 #include <logging/logger.h>
27 
28 #include <boost/bind.hpp>
29 #include <boost/lexical_cast.hpp>
30 
31 using namespace boost::asio;
32 
33 // Types copied from OPRS because they are not public there
34 namespace OPRS {
35 typedef enum { MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
36 typedef enum { REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED } Register_Type;
37 typedef enum { MESSAGES_PT, STRINGS_PT } Protocol_Type;
38 } // namespace OPRS
39 
40 namespace fawkes {
41 
42 /** @class OpenPRSMessagePasserProxy "openprs_mp_proxy.h"
43  * Proxy for the OpenPRS server communication.
44  * Using this proxy allows to inject commands into the communication between
45  * oprs-server and oprs (or xoprs).
46  * @author Tim Niemueller
47  */
48 
49 /** Constructor.
50  * @param tcp_port port to listen on for incoming connections
51  * @param mp_host host of mp-oprs to connect to
52  * @param mp_port TCP port that mp-oprs listens on
53  * @param logger logger for informational messages
54  */
55 OpenPRSMessagePasserProxy::OpenPRSMessagePasserProxy(unsigned short tcp_port,
56  const std::string &mp_host,
57  unsigned short mp_port,
58  fawkes::Logger * logger)
59 : io_service_work_(io_service_),
60  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
61  mp_host_(mp_host),
62  mp_port_(mp_port),
63  logger_(logger)
64 {
65  acceptor_.set_option(socket_base::reuse_address(true));
66  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
67  start_accept();
68 }
69 
70 /** Destructor. */
72 {
73  io_service_.stop();
74  io_service_thread_.join();
75 }
76 
77 /** Start accepting connections. */
78 void
79 OpenPRSMessagePasserProxy::start_accept()
80 {
81  Mapping::Ptr mapping(new Mapping(io_service_, mp_host_, mp_port_, logger_));
82  acceptor_.async_accept(mapping->client_socket,
83  boost::bind(&OpenPRSMessagePasserProxy::handle_accept,
84  this,
85  mapping,
86  boost::asio::placeholders::error));
87 }
88 
89 void
90 OpenPRSMessagePasserProxy::handle_accept(Mapping::Ptr mapping,
91  const boost::system::error_code &error)
92 {
93  if (!error) {
94  mappings_.push_back(mapping);
95  mapping->start();
96  }
97 
98  start_accept();
99 }
100 
101 OpenPRSMessagePasserProxy::Mapping::Mapping(boost::asio::io_service &io_service,
102  const std::string & mp_host,
103  unsigned short mp_port,
104  fawkes::Logger * logger)
105 : io_service_(io_service),
106  resolver_(io_service_),
107  server_host_(mp_host),
108  server_port_(mp_port),
109  logger_(logger),
110  server_in_reg_reply_(0),
111  server_in_str_len_(0),
112  client_in_msg_type_(0),
113  client_socket(io_service_),
114  server_socket(io_service_)
115 {
116 }
117 
118 /** Destruct mapping.
119  * This closes both, client and server sockets. This destructor
120  * assumes that the io_service has been cancelled.
121  */
122 OpenPRSMessagePasserProxy::Mapping::~Mapping()
123 {
124  boost::system::error_code err;
125  client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
126  client_socket.close();
127  server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
128  server_socket.close();
129 }
130 
131 /** A client has connected, start this mapping. */
132 void
133 OpenPRSMessagePasserProxy::Mapping::start()
134 {
135  client_prot = read_int_from_socket(client_socket);
136  client_name = read_string_from_socket(client_socket);
137 
138  logger_->log_info("OPRS-mp-proxy", "Client %s connected", client_name.c_str());
139 
140  ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
141  resolver_.async_resolve(query,
142  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_resolve,
143  this,
144  boost::asio::placeholders::error,
145  boost::asio::placeholders::iterator));
146 }
147 
148 bool
149 OpenPRSMessagePasserProxy::Mapping::alive() const
150 {
151  return client_socket.is_open();
152 }
153 
154 /** Disconnect this client. */
155 void
156 OpenPRSMessagePasserProxy::Mapping::disconnect()
157 {
158  disconnect("disconnect", "API call");
159 }
160 
161 void
162 OpenPRSMessagePasserProxy::Mapping::disconnect(const char *where, const char *reason)
163 {
164  logger_->log_warn(
165  "OPRS-mp-proxy", "Client %s disconnected (%s: %s)", client_name.c_str(), where, reason);
166  boost::system::error_code ec;
167  client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
168  client_socket.close();
169 }
170 
171 void
172 OpenPRSMessagePasserProxy::Mapping::handle_resolve(const boost::system::error_code &err,
173  ip::tcp::resolver::iterator endpoint_iterator)
174 {
175  if (!err) {
176  // Attempt a connection to each endpoint in the list until we
177  // successfully establish a connection.
178 #if BOOST_ASIO_VERSION > 100409
179  boost::asio::async_connect(server_socket,
180  endpoint_iterator,
181 #else
182  server_socket.async_connect(*endpoint_iterator,
183 #endif
184  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_connect,
185  this,
186  boost::asio::placeholders::error));
187  } else {
188  disconnect("handle_resolve", err.message().c_str());
189  }
190 }
191 
192 void
193 OpenPRSMessagePasserProxy::Mapping::handle_connect(const boost::system::error_code &err)
194 {
195  if (!err) {
196  write_int_to_socket(server_socket, client_prot);
197  write_string_to_socket(server_socket, client_name);
198 
199  // asynchronously read registration reply
200  boost::asio::async_read(
201  server_socket,
202  boost::asio::buffer(&server_in_reg_reply_, sizeof(server_in_reg_reply_)),
203  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply,
204  this,
205  boost::asio::placeholders::error));
206 
207  } else {
208  disconnect("handle_connect", err.message().c_str());
209  }
210 }
211 
212 void
213 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply(
214  const boost::system::error_code &err)
215 {
216  write_int_to_socket(client_socket, server_in_reg_reply_);
217 
218  if (server_in_reg_reply_ == OPRS::REGISTER_OK) {
219  start_recv_client();
220  start_recv_server();
221  } else {
222  disconnect("recv_server_reg_reply", err.message().c_str());
223  }
224 }
225 
226 void
227 OpenPRSMessagePasserProxy::Mapping::start_recv_client()
228 {
229  boost::asio::async_read(client_socket,
230  boost::asio::buffer(&client_in_msg_type_, sizeof(client_in_msg_type_)),
231  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_client,
232  this,
233  boost::asio::placeholders::error));
234 }
235 
236 void
237 OpenPRSMessagePasserProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
238 {
239  if (!err) {
240  try {
241  std::vector<std::string> multicast_recipients;
242  std::string message;
243  std::string recipient;
244 
245  client_in_msg_type_ = ntohl(client_in_msg_type_);
246 
247  switch (client_in_msg_type_) {
248  case OPRS::DISCONNECT_MT:
249  logger_->log_info("OPRS-mp-proxy", "Disconnecting %s", client_name.c_str());
250  disconnect("recv_client", "Client disconnected");
251  return;
252 
253  case OPRS::MESSAGE_MT: recipient = read_string_from_socket(client_socket); break;
254 
255  case OPRS::MULTICAST_MT:
256  multicast_recipients.resize(read_int_from_socket(client_socket));
257  break;
258 
259  case OPRS::BROADCAST_MT: break; // nothing to do here
260 
261  default: disconnect("recv_client", "Unknown message type"); return;
262  }
263 
264  message = read_string_from_socket(client_socket);
265 
266  if (client_in_msg_type_ == OPRS::MULTICAST_MT) {
267  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
268  multicast_recipients[i] = read_string_from_socket(client_socket);
269  }
270  }
271 
272  // debug output
273  switch (client_in_msg_type_) {
274  case OPRS::MESSAGE_MT:
275  logger_->log_info("OPRS-mp-proxy",
276  "Forwarding unicast %s->%s: '%s'",
277  client_name.c_str(),
278  recipient.c_str(),
279  message.c_str());
280  break;
281 
282  case OPRS::MULTICAST_MT: {
283  std::string recipients;
284  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
285  if (i > 0)
286  recipients += ", ";
287  recipients += multicast_recipients[i];
288  }
289 
290  logger_->log_info("OPRS-mp-proxy",
291  "Forwarding multicast %s->(%s): '%s'",
292  client_name.c_str(),
293  recipients.c_str(),
294  message.c_str());
295  } break;
296 
297  case OPRS::BROADCAST_MT:
298  logger_->log_info("OPRS-mp-proxy",
299  "Forwarding broadcast %s->*: '%s'",
300  client_name.c_str(),
301  message.c_str());
302  break;
303 
304  default: break;
305  }
306 
307  // now re-send message to server
308  write_int_to_socket(server_socket, client_in_msg_type_);
309 
310  switch (client_in_msg_type_) {
311  case OPRS::MESSAGE_MT:
312  write_string_to_socket(server_socket, recipient);
313  write_string_to_socket(server_socket, message);
314  break;
315 
316  case OPRS::MULTICAST_MT:
317  write_string_to_socket(server_socket, message);
318  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
319  write_string_to_socket(server_socket, multicast_recipients[i]);
320  }
321  break;
322 
323  case OPRS::BROADCAST_MT: // nothing to do here
324  write_string_to_socket(server_socket, message);
325  break;
326 
327  default: break; // cannot happen here anymore
328  }
329 
330  start_recv_client();
331  } catch (Exception &e) {
332  disconnect("recv_client", e.what_no_backtrace());
333  }
334  } else {
335  disconnect("recv_client", err.message().c_str());
336  }
337 }
338 
339 void
340 OpenPRSMessagePasserProxy::Mapping::start_recv_server()
341 {
342  if (client_prot == OPRS::MESSAGES_PT) {
343  logger_->log_warn("OPRS-mp-proxy",
344  "Starting listening for %s in MESSAGES_PT mode",
345  client_name.c_str());
346  boost::asio::async_read_until(
347  server_socket,
348  server_buffer_,
349  '\n',
350  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt,
351  this,
352  boost::asio::placeholders::error));
353  } else {
354  // tried async_read_some with null buffers but always immediately fires without data available
355  logger_->log_warn("OPRS-mp-proxy",
356  "Starting listening for %s in STRINGS_PT mode",
357  client_name.c_str());
358  server_socket.async_read_some(
359  boost::asio::null_buffers(),
360  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt,
361  this,
362  boost::asio::placeholders::error));
363  }
364 }
365 
366 void
367 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt(
368  const boost::system::error_code &err)
369 {
370  if (!err) {
371  std::string line;
372  std::istream in_stream(&server_buffer_);
373  std::getline(in_stream, line);
374 
375  logger_->log_info("OPRS-mp-proxy",
376  "Forwarding server ->%s: '%s\\n'",
377  client_name.c_str(),
378  line.c_str());
379 
380  // resend to client
381  write_string_newline_to_socket(client_socket, line);
382 
383  start_recv_server();
384  } else {
385  disconnect("recv_server_message_pt", err.message().c_str());
386  }
387 }
388 
389 void
390 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt(
391  const boost::system::error_code &err)
392 {
393  if (!err) {
394  try {
395  std::string sender = read_string_from_socket(server_socket);
396  std::string message = read_string_from_socket(server_socket);
397 
398  logger_->log_info("OPRS-mp-proxy",
399  "Forwarding server %s->%s: '%s'",
400  sender.c_str(),
401  client_name.c_str(),
402  message.c_str());
403 
404  // resend to client
405  write_string_to_socket(client_socket, sender);
406  write_string_to_socket(client_socket, message);
407 
408  start_recv_server();
409  } catch (Exception &e) {
410  disconnect("recv_server_strings_pt", e.what_no_backtrace());
411  }
412  } else {
413  disconnect("recv_server_strings_pt", err.message().c_str());
414  }
415 }
416 
417 int
418 OpenPRSMessagePasserProxy::Mapping::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
419 {
420  int32_t value;
421  boost::system::error_code ec;
422  boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
423  if (ec) {
424  throw Exception("Failed to read int from socket: %s", ec.message().c_str());
425  } else {
426  return ntohl(value);
427  }
428 }
429 
430 std::string
431 OpenPRSMessagePasserProxy::Mapping::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
432 {
433  uint32_t s_size = 0;
434  boost::system::error_code ec;
435  boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
436  if (ec) {
437  throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
438  }
439  s_size = ntohl(s_size);
440 
441  char s[s_size + 1];
442  boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
443  if (ec) {
444  throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
445  }
446  s[s_size] = 0;
447 
448  return s;
449 }
450 
451 void
452 OpenPRSMessagePasserProxy::Mapping::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
453 {
454  boost::system::error_code ec;
455  int32_t value = htonl(i);
456  boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
457  if (ec) {
458  throw Exception("Failed to write int to socket: %s", ec.message().c_str());
459  }
460 }
461 
462 void
463 OpenPRSMessagePasserProxy::Mapping::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
464  std::string & str)
465 {
466  boost::system::error_code ec;
467  uint32_t s_size = htonl(str.size());
468  std::array<boost::asio::const_buffer, 2> buffers;
469  buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
470  buffers[1] = boost::asio::buffer(str.c_str(), str.size());
471 
472  boost::asio::write(socket, buffers, ec);
473  if (ec) {
474  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
475  }
476 }
477 
478 void
479 OpenPRSMessagePasserProxy::Mapping::write_string_newline_to_socket(
480  boost::asio::ip::tcp::socket &socket,
481  const std::string & str)
482 {
483  boost::system::error_code ec;
484  std::string s = str + "\n";
485  boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
486  if (ec) {
487  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
488  }
489 }
490 
491 } // end namespace fawkes
Interface for logging.
Definition: logger.h:42
virtual ~OpenPRSMessagePasserProxy()
Destructor.
Fawkes library namespace.