Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Can not receive neutron notification while can receive nova notification

Hi all,

I implemented AMQP connection to listen OpenStack notification by kombu. Now, we can receive the nova notification right now. However, we can not receive neutron notification. The code shows below. Before we upgrade the OpenStack, our code can work, while it does not work after we upgrade the OpenStack. Thank you for your time.

class Worker(ConsumerMixin):
    event_queues = [
        Queue('notifications.neutron',
              Exchange('neutron', 'topic', durable=False),
              durable=False, routing_key='#'),
        Queue('notifications.nova',
              Exchange('nova', 'topic', durable=False),
              durable=False, routing_key='#'),
    ]

    def __init__(self, connection):
        self.connection = connection

    def set_env(self, env, inventory_collection):
        inv = InventoryMgr()
        inv.set_inventory_collection(inventory_collection)
        self.handler = EventHandler(env, inventory_collection)
        self.notification_responses = {
            "compute.instance.create.end": self.handler.instance_add,
            "network.create.end": self.handler.network_create,
            "network.update.end": self.handler.network_update,

        }

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.event_queues,
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        f = open("./listener.log", "a")
        f.write(body['oslo.message'] + "\n")
        f.close()
        if "event_type" in body:
            self.handle_event(body["event_type"], body)
        elif "event_type" in body['oslo.message']:
            msg = json.loads(body['oslo.message'])
            print("event_type:", msg['event_type'])
            self.handle_event(msg['event_type'], msg)
        message.ack()

    def handle_event(self, type, notification):
        print("got notification, event_type: " + type + '\n' + str(notification))
        if type not in self.notification_responses.keys():
            return ""
        return self.notification_responses[type](notification)

if __name__ == '__main__':
    logger = Logger()
    from kombu import Connection
    args = get_args()
    conf = Configuration(args.mongo_config)
    logger.set_loglevel(args.loglevel)
    env = args.env
    conf.use_env(env)
    amqp_config = conf.get("AMQP")
    host = amqp_config["host"]
    port = amqp_config["port"]
    user = amqp_config["user"]
    pwd = amqp_config["password"]
    connect_url = 'amqp://' + user + ':' + pwd + '@' + host + ':' + port + '//'
    with Connection(connect_url) as conn:
        try:
            print(conn)
            worker = Worker(conn)
            worker.set_env(env, args.inventory)
            worker.run()
        except KeyboardInterrupt:
            print('Stopped')