12 #include <mrpt/system/COutputLogger.h>
13 #include <mrpt/system/CTimeLogger.h>
14 #include <mvsim/Comms/common.h>
15 #include <mvsim/Comms/zmq_fwrds.h>
21 namespace mvsim::internal
23 struct InfoPerService;
24 struct InfoPerSubscribedTopic;
48 class Client :
public mrpt::system::COutputLogger
52 Client(
const std::string& nodeName);
58 void setName(
const std::string& nodeName);
60 const std::string& serverHostAddress()
const {
return serverHostAddress_; }
61 void serverHostAddress(
const std::string& serverIpOrAddressName)
64 serverHostAddress_ = serverIpOrAddressName;
82 void advertiseTopic(const std::
string& topicName);
84 void publishTopic(const std::
string& topicName, const google::protobuf::Message& msg);
86 template <typename MSG_T>
88 const std::
string& topicName, const std::function<
void(const MSG_T&)>& callback);
90 template <typename INPUT_MSG_T, typename OUTPUT_MSG_T>
91 void advertiseService(
92 const std::
string& serviceName,
93 const std::function<OUTPUT_MSG_T(const INPUT_MSG_T&)>& callback);
95 template <typename INPUT_MSG_T, typename OUTPUT_MSG_T>
97 const std::
string& serviceName, const INPUT_MSG_T& input, OUTPUT_MSG_T& output);
100 std::
string callService(const std::
string& serviceName, const std::
string& inputSerializedMsg);
104 const std::
string& topicName,
106 void(const std::
string& , const std::vector<uint8_t>& )>&
113 std::vector<InfoPerNode> requestListOfNodes();
119 std::vector<std::string> endpoints, publishers;
121 std::vector<InfoPerTopic> requestListOfTopics();
123 using topic_callback_t = std::function<void(
const zmq::message_t& )>;
125 void subscribe_topic_raw(
const std::string& topicName,
const topic_callback_t& callback);
129 const mrpt::system::CTimeLogger& profiler()
const {
return profiler_; }
130 void enable_profiler(
bool enable) { profiler_.enable(enable); }
134 std::unique_ptr<ZMQImpl> zmq_;
136 std::string serverHostAddress_ =
"localhost";
137 std::string nodeName_ =
"anonymous";
139 std::thread serviceInvokerThread_;
140 std::thread topicUpdatesThread_;
142 std::map<std::string, std::string> serviceToEndPointCache_;
143 std::mutex serviceToEndPointCacheMtx_;
145 mrpt::system::CTimeLogger profiler_{
false,
"mvsim::Client"};
147 void doRegisterClient();
148 void doUnregisterClient();
150 void internalServiceServingThread();
151 void internalTopicUpdatesThread();
152 void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt);
154 using service_callback_t = std::function<std::shared_ptr<google::protobuf::Message>(
155 const std::string& )>;
157 void doAdvertiseTopic(
158 const std::string& topicName,
const google::protobuf::Descriptor* descriptor);
159 void doAdvertiseService(
160 const std::string& serviceName,
const google::protobuf::Descriptor* descIn,
161 const google::protobuf::Descriptor* descOut, service_callback_t callback);
163 void doSubscribeTopic(
164 const std::string& topicName,
const google::protobuf::Descriptor* descriptor,
165 const topic_callback_t& callback);
167 const std::string& serviceName,
const std::string& inputSerializedMsg,
168 mrpt::optional_ref<google::protobuf::Message> outputMsg,
169 mrpt::optional_ref<std::string> outputSerializedMsg = std::nullopt,
170 mrpt::optional_ref<std::string> outputMsgTypeName = std::nullopt);
172 friend struct internal::InfoPerService;
173 friend struct internal::InfoPerSubscribedTopic;
176 template <
typename T>
177 void Client::advertiseTopic(
const std::string& topicName)
179 doAdvertiseTopic(topicName, T::descriptor());
182 template <
typename INPUT_MSG_T,
typename OUTPUT_MSG_T>
183 void Client::advertiseService(
184 const std::string& serviceName,
const std::function<OUTPUT_MSG_T(
const INPUT_MSG_T&)>& callback)
187 serviceName, INPUT_MSG_T::descriptor(), OUTPUT_MSG_T::descriptor(),
189 [callback](
const std::string& inData)
192 in.ParseFromString(inData);
193 return std::make_shared<OUTPUT_MSG_T>(callback(in));
197 template <
typename MSG_T>
198 void Client::subscribeTopic(
199 const std::string& topicName,
const std::function<
void(
const MSG_T&)>& callback)
202 topicName, MSG_T::descriptor(),
204 [callback](
const zmq::message_t& m)
212 template <
typename INPUT_MSG_T,
typename OUTPUT_MSG_T>
213 void Client::callService(
214 const std::string& serviceName,
const INPUT_MSG_T& input, OUTPUT_MSG_T& output)
216 doCallService(serviceName, input.SerializeAsString(), output);
void parseMessage(const zmq::message_t &msg, google::protobuf::MessageLite &out)