Can not receive neutron notification while can receive nova notification
Hi all,
I implemented AMQP connection to listen OpenStack notification by kombu. Now, we can receive the nova notification right now. However, we can not receive neutron notification. The code shows below. Before we upgrade the OpenStack, our code can work, while it does not work after we upgrade the OpenStack. Thank you for your time.
class Worker(ConsumerMixin):
event_queues = [
Queue('notifications.neutron',
Exchange('neutron', 'topic', durable=False),
durable=False, routing_key='#'),
Queue('notifications.nova',
Exchange('nova', 'topic', durable=False),
durable=False, routing_key='#'),
]
def __init__(self, connection):
self.connection = connection
def set_env(self, env, inventory_collection):
inv = InventoryMgr()
inv.set_inventory_collection(inventory_collection)
self.handler = EventHandler(env, inventory_collection)
self.notification_responses = {
"compute.instance.create.end": self.handler.instance_add,
"network.create.end": self.handler.network_create,
"network.update.end": self.handler.network_update,
}
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.event_queues,
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
f = open("./listener.log", "a")
f.write(body['oslo.message'] + "\n")
f.close()
if "event_type" in body:
self.handle_event(body["event_type"], body)
elif "event_type" in body['oslo.message']:
msg = json.loads(body['oslo.message'])
print("event_type:", msg['event_type'])
self.handle_event(msg['event_type'], msg)
message.ack()
def handle_event(self, type, notification):
print("got notification, event_type: " + type + '\n' + str(notification))
if type not in self.notification_responses.keys():
return ""
return self.notification_responses[type](notification)
if __name__ == '__main__':
logger = Logger()
from kombu import Connection
args = get_args()
conf = Configuration(args.mongo_config)
logger.set_loglevel(args.loglevel)
env = args.env
conf.use_env(env)
amqp_config = conf.get("AMQP")
host = amqp_config["host"]
port = amqp_config["port"]
user = amqp_config["user"]
pwd = amqp_config["password"]
connect_url = 'amqp://' + user + ':' + pwd + '@' + host + ':' + port + '//'
with Connection(connect_url) as conn:
try:
print(conn)
worker = Worker(conn)
worker.set_env(env, args.inventory)
worker.run()
except KeyboardInterrupt:
print('Stopped')