Stream Analytics десериализует JSON из Python через Event Hub

python json azure amqp asa

910 просмотра

3 ответа

1 Репутация автора

Я настроил концентратор событий Azure и отправляю сообщения AMQP в формате JSON из сценария Python и пытаюсь передать эти сообщения в Power BI с помощью Stream Analytics. Сообщения очень простое устройство активности и IoT устройства

Фрагмент Python является

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf8')
message.body = msg
messenger.put(message)
messenger.send()

Я использовал пример средства чтения сообщений C # в руководстве по MS для чтения данных из концентратора событий без проблем, вывод:

Message received.  Partition: '2', Data: '??{"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"}'

Но когда я пытаюсь проверить входные данные Stream Analytics из концентратора событий, я получаю сообщение об ошибке

Диагностика: не удалось десериализовать входное событие как Json. Некоторые возможные причины: 1) искаженные события 2) источник входного сигнала настроен с неверным форматом сериализации

Я не уверен, что означают искаженные события - я предположил, что Stream Analytics может справиться с данными, отправленными в концентратор событий через AMQP?

Я не вижу ничего плохого в JSON, полученном приложением C # - разве символ BOM вызывает проблему?

Это моя первая попытка во всем этом, и я безуспешно искал похожие посты, поэтому я был бы очень признателен, если бы кто-то указал мне правильное направление.

Приветствия Роб

Автор: Rob Lewis Источник Размещён: 15.07.2016 08:12

Ответы (3)


2 плюса

406 Репутация автора

Это вызвано несовместимостью API клиента. Python использует Proton для отправки строки JSON в теле сообщения AMQP Value. Тело кодируется в виде строки AMQP (байты кодирования типа AMQP + кодированные байты utf8 строки). Stream Analytics использует служебную шину .Net SDK, которая представляет сообщение AMQP в качестве EventData, а ее тело всегда представляет собой байтовый массив. Для сообщения значения AMQP оно включает байты кодирования типа AMQP, так как без них невозможно декодировать следующее значение. Эти дополнительные байты в начале приведут к сбою сериализации JSON.

Для достижения совместимости в теле сообщения приложение должно обеспечить, чтобы издатель и потребитель согласовали его тип и кодировку. В этом случае издатель должен отправить необработанные байты в сообщении AMQP Data. С Proton Python API вы можете попробовать это:

message.body = msg.encode('utf-8')

Другим обходным решением является отправка простых типов (например, строки) в свойствах приложения.

Другие люди также столкнулись с этой проблемой. https://github.com/Azure/amqpnetlite/issues/117

Автор: Xin Chen Размещён: 17.07.2016 05:56

0 плюса

15876 Репутация автора

Как сказал @XinChen, проблема была вызвана протоколом AMQP.

По моему опыту, два обходных пути ниже эффективны для этого случая.

  1. Использование Send EventREST API вместо Azure Python SDK с AMQP, но остальные API основаны на протоколе HTTP, который не обеспечивает высокую производительность.
  2. Отправка сообщения JSON с кодировкой Base64, а затем декодирование полученного сообщения в строку JSON.
Автор: Peter Pan Размещён: 25.07.2016 06:55

0 плюса

648 Репутация автора

Эти две вещи работали для меня:

  • Добавлять message.inferred = True
  • убедитесь, что указанная вами дамп кодируется encoding='utf-8'не так, encoding='utf8'как в вашем примере.

Обновленный ОП:

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf-8')
message.body = msg
message.inferred = True
messenger.put(message)
messenger.send()

Добавляя выведенный флаг, я думаю, что сериализатор сообщений может правильно сделать вывод, что тело является байтами, и создать AMPQ DATA, таким образом обращаясь к точке @Xin Chen.

Флаг вывода для сообщения указывает, как содержимое сообщения кодируется в разделы AMQP. Если вывод равен true, то двоичные и списочные значения в теле сообщения будут закодированы как разделы AMQP DATA и AMQP SEQUENCE соответственно. Если вывод равен false, то все значения в теле сообщения будут закодированы как разделы AMQP VALUE независимо от их типа.

re: Qpid Proton Docs #inferred

re: JSON Encoder and Decoder #dumps

Автор: donnie_armstrong Размещён: 28.07.2016 09:48
Вопросы из категории :
32x32