How can I publish to a Gazebo Topic from Python
I am trying to build a publisher in python, for now I am using the 'Topic visualization' tool in gzclient as my subscriber.
I am trying to build a publisher in python, for now I am using the 'Topic visualization' tool in gzclient as my subscriber.
That took a while to figure out, but this works on Gazebo 1.4.
Note that I copied all of the necessary message .proto files from the gazebo source directory into a directory './proto/msg' and compiled them with:
protoc -I='./proto/msg' --python_out='./proto' ./proto/msg/*.proto
I then made that directory a python library with
touch ./proto/__init__.py
Finally I used this script to publish some bogus data:
#!/usr/bin/env python
import socket
import time
import sys
from datetime import datetime
from proto.packet_pb2 import Packet
from proto.publish_pb2 import Publish
from proto.request_pb2 import Request
from proto.response_pb2 import Response
from proto.pose_pb2 import Pose
from proto.subscribe_pb2 import Subscribe
MASTER_TCP_IP = '127.0.0.1'
MASTER_TCP_PORT = 11345
NODE_TCP_IP = '127.0.0.1'
NODE_TCP_PORT = 11451
TCP_BUFFER_SIZE = 40960
# Listen for Subscribers
s_sub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_sub.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s_sub.bind((NODE_TCP_IP, NODE_TCP_PORT))
s_sub.listen(5)
# Register as a Publisher with Gazebo
pk = Packet()
pk.stamp.sec = int(time.time())
pk.stamp.nsec = datetime.now().microsecond
pk.type = "advertise"
pub = Publish()
pub.topic = "/gazebo/default/Pioneer3AT/cmd_vel"
pub.msg_type = Pose.DESCRIPTOR.full_name
pub.host = NODE_TCP_IP
pub.port = NODE_TCP_PORT
pk.serialized_data = pub.SerializeToString()
s_reg = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_reg.connect((MASTER_TCP_IP, MASTER_TCP_PORT))
s_reg.send(hex(pk.ByteSize()).rjust(8))
s_reg.send(pk.SerializeToString())
# Respond to a subscriber
try:
conn, address = s_sub.accept()
data = conn.recv(TCP_BUFFER_SIZE)
# Decode Incomming Packet
pk_sub = Packet()
pk_sub.ParseFromString(data[8:])
print "Packet:\n", pk_sub
# Decode Subscription Request
sub = Subscribe()
sub.ParseFromString(pk_sub.serialized_data)
print "Sub:\n", sub
# Pack Data for Reply
cmd_vel = Pose()
cmd_vel.name = "Pioneer3AT"
cmd_vel.position.x = 0.1
cmd_vel.position.y = 0.2
cmd_vel.position.z = 0.3
cmd_vel.orientation.x = 1
cmd_vel.orientation.y = 2
cmd_vel.orientation.z = 3
cmd_vel.orientation.w = 4
# Publish Packet to Subscriber
while 1:
pk_pub = Packet()
pk_pub.stamp.sec = int(time.time())
pk_pub.stamp.nsec = datetime.now().microsecond
pk_pub.type = Pose.DESCRIPTOR.full_name
pk_pub.serialized_data = cmd_vel.SerializeToString()
conn.send(hex(cmd_vel.ByteSize()).rjust(8))
conn.send(cmd_vel.SerializeToString())
time.sleep(0.2)
finally:
#This connection must remain open
s_reg.close()
Hi, I'm reviving this post to add that it still works for gazebo 7.15.0. I have modified the code in order to get it working with python 3, there where some problems with hexadecimal to binary conversion. My code is used to run the simulation for a given number of steps, in the context of a Reinforcement Learning algorithm which needs to be repeatable and precisely timed. You have to copy and paste the useful gazebo message, compile it and turn it into a library as described in the previous answer.
#!/usr/bin/env python
import socket
import time
import sys
from datetime import datetime
sys.path.append("./proto")
from proto.packet_pb2 import Packet
from proto.publish_pb2 import Publish
from proto.request_pb2 import Request
from proto.response_pb2 import Response
from proto.world_control_pb2 import WorldControl
from proto.subscribe_pb2 import Subscribe
MASTER_TCP_IP = '127.0.0.1'
MASTER_TCP_PORT = 11345
NODE_TCP_IP = '127.0.0.1'
NODE_TCP_PORT = 11451
TCP_BUFFER_SIZE = 40960
# Listen for Subscribers
s_sub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_sub.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s_sub.bind((NODE_TCP_IP, NODE_TCP_PORT))
s_sub.listen(5)
# Register as a Publisher with Gazebo
pk = Packet()
pk.stamp.sec = int(time.time())
pk.stamp.nsec = datetime.now().microsecond
pk.type = "advertise"
pub = Publish()
pub.topic = "/gazebo/default/world_control"
pub.msg_type = WorldControl.DESCRIPTOR.full_name
pub.host = NODE_TCP_IP
pub.port = NODE_TCP_PORT
pk.serialized_data = pub.SerializeToString()
s_reg = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_reg.connect((MASTER_TCP_IP, MASTER_TCP_PORT))
s_reg.send(hex(pk.ByteSize()).rjust(8).encode('utf-8'))
s_reg.send(pk.SerializeToString())
conn, address = s_sub.accept()
msg_world_control = WorldControl()
msg_world_control.multi_step = 143
msg_world_control.pause = True
pk_pub = Packet()
pk_pub.stamp.sec = int(time.time())
pk_pub.stamp.nsec = datetime.now().microsecond
pk_pub.type = WorldControl.DESCRIPTOR.full_name
pk_pub.serialized_data = msg_world_control.SerializeToString()
def stepSimulation(nbOfSteps):
msg_world_control.multi_step = nbOfSteps
conn.send(hex(msg_world_control.ByteSize()).rjust(8).encode('utf-8'))
conn.send(msg_world_control.SerializeToString())
Keywords : run a given number of steps / iterations in gazebo, send world_control message with rospy (ROS python)
hey man I have put this code as a ROS node and I am trying to have a subscriber that receives a message from another ROS node ( that has RL algorithim ) to call the stepSimulation function. However if I write the rospy subscriber after the end of your code, it just doesn't get read and it's just not there in the rqt_graph
Asked: 2013-03-04 09:09:15 -0500
Seen: 3,723 times
Last updated: Jul 07 '19