Home | Tutorials | Wiki | Issues
Ask Your Question
2

How can I publish to a Gazebo Topic from Python

asked 2013-03-04 09:09:15 -0500

dereck gravatar image

updated 2013-03-04 16:44:41 -0500

I am trying to build a publisher in python, for now I am using the 'Topic visualization' tool in gzclient as my subscriber.

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
0

answered 2013-03-04 16:43:15 -0500

dereck gravatar image

updated 2013-03-04 16:49:53 -0500

That took a while to figure out, but this works on Gazebo 1.4.

Note that I copied all of the necessary message .proto files from the gazebo source directory into a directory './proto/msg' and compiled them with:

protoc -I='./proto/msg' --python_out='./proto' ./proto/msg/*.proto

I then made that directory a python library with

touch ./proto/__init__.py

Finally I used this script to publish some bogus data:

#!/usr/bin/env python

import socket
import time
import sys
from datetime import datetime

from proto.packet_pb2     import Packet
from proto.publish_pb2    import Publish
from proto.request_pb2    import Request
from proto.response_pb2   import Response
from proto.pose_pb2       import Pose
from proto.subscribe_pb2  import Subscribe

MASTER_TCP_IP   = '127.0.0.1'
MASTER_TCP_PORT = 11345

NODE_TCP_IP     = '127.0.0.1'
NODE_TCP_PORT   = 11451

TCP_BUFFER_SIZE = 40960


# Listen for Subscribers
s_sub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_sub.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s_sub.bind((NODE_TCP_IP, NODE_TCP_PORT))
s_sub.listen(5)


# Register as a Publisher with Gazebo
pk            = Packet()
pk.stamp.sec  = int(time.time())
pk.stamp.nsec = datetime.now().microsecond
pk.type       = "advertise"

pub           = Publish()
pub.topic     = "/gazebo/default/Pioneer3AT/cmd_vel"
pub.msg_type  = Pose.DESCRIPTOR.full_name
pub.host      = NODE_TCP_IP
pub.port      = NODE_TCP_PORT

pk.serialized_data = pub.SerializeToString()

s_reg = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_reg.connect((MASTER_TCP_IP, MASTER_TCP_PORT))
s_reg.send(hex(pk.ByteSize()).rjust(8))
s_reg.send(pk.SerializeToString())


# Respond to a subscriber
try:
  conn, address = s_sub.accept()
  data = conn.recv(TCP_BUFFER_SIZE)

  # Decode Incomming Packet
  pk_sub = Packet()
  pk_sub.ParseFromString(data[8:])
  print "Packet:\n", pk_sub

  # Decode Subscription Request
  sub = Subscribe()
  sub.ParseFromString(pk_sub.serialized_data)
  print "Sub:\n", sub

  # Pack Data for Reply
  cmd_vel = Pose()
  cmd_vel.name = "Pioneer3AT"
  cmd_vel.position.x = 0.1
  cmd_vel.position.y = 0.2
  cmd_vel.position.z = 0.3
  cmd_vel.orientation.x = 1
  cmd_vel.orientation.y = 2
  cmd_vel.orientation.z = 3
  cmd_vel.orientation.w = 4

  # Publish Packet to Subscriber
  while 1:
    pk_pub            = Packet()
    pk_pub.stamp.sec  = int(time.time())
    pk_pub.stamp.nsec = datetime.now().microsecond
    pk_pub.type       = Pose.DESCRIPTOR.full_name
    pk_pub.serialized_data = cmd_vel.SerializeToString()

    conn.send(hex(cmd_vel.ByteSize()).rjust(8))
    conn.send(cmd_vel.SerializeToString())

    time.sleep(0.2)

finally:
  #This connection must remain open
  s_reg.close()
edit flag offensive delete link more
0

answered 2019-07-07 17:29:38 -0500

RobinBaruffa gravatar image

Hi, I'm reviving this post to add that it still works for gazebo 7.15.0. I have modified the code in order to get it working with python 3, there where some problems with hexadecimal to binary conversion. My code is used to run the simulation for a given number of steps, in the context of a Reinforcement Learning algorithm which needs to be repeatable and precisely timed. You have to copy and paste the useful gazebo message, compile it and turn it into a library as described in the previous answer.

#!/usr/bin/env python
import socket
import time
import sys
from datetime import datetime
sys.path.append("./proto")
from proto.packet_pb2 import Packet
from proto.publish_pb2 import Publish
from proto.request_pb2 import Request
from proto.response_pb2 import Response
from proto.world_control_pb2 import WorldControl
from proto.subscribe_pb2 import Subscribe

MASTER_TCP_IP   = '127.0.0.1'
MASTER_TCP_PORT = 11345
NODE_TCP_IP     = '127.0.0.1'
NODE_TCP_PORT   = 11451
TCP_BUFFER_SIZE = 40960
# Listen for Subscribers
s_sub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_sub.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s_sub.bind((NODE_TCP_IP, NODE_TCP_PORT))
s_sub.listen(5)
# Register as a Publisher with Gazebo
pk            = Packet()
pk.stamp.sec  = int(time.time())
pk.stamp.nsec = datetime.now().microsecond
pk.type       = "advertise"

pub           = Publish()
pub.topic     = "/gazebo/default/world_control"
pub.msg_type  = WorldControl.DESCRIPTOR.full_name
pub.host      = NODE_TCP_IP
pub.port      = NODE_TCP_PORT
pk.serialized_data = pub.SerializeToString()

s_reg = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_reg.connect((MASTER_TCP_IP, MASTER_TCP_PORT))
s_reg.send(hex(pk.ByteSize()).rjust(8).encode('utf-8'))
s_reg.send(pk.SerializeToString())

conn, address = s_sub.accept()
msg_world_control = WorldControl()
msg_world_control.multi_step = 143
msg_world_control.pause = True

pk_pub            = Packet()
pk_pub.stamp.sec  = int(time.time())
pk_pub.stamp.nsec = datetime.now().microsecond
pk_pub.type       = WorldControl.DESCRIPTOR.full_name
pk_pub.serialized_data = msg_world_control.SerializeToString()

def stepSimulation(nbOfSteps):
    msg_world_control.multi_step = nbOfSteps
    conn.send(hex(msg_world_control.ByteSize()).rjust(8).encode('utf-8'))
    conn.send(msg_world_control.SerializeToString())

Keywords : run a given number of steps / iterations in gazebo, send world_control message with rospy (ROS python)

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2013-03-04 09:09:15 -0500

Seen: 1,886 times

Last updated: Jul 07