Playframework и Twitter Streaming API

scala twitter playframework twitter-streaming-api playframework-2.5

606 просмотра

2 ответа

Как прочитать данные ответа из Twitter Streaming API - статусы / фильтр POST? Я установил соединение и получил 200 кодов статуса, но я не знаю, как читать твиты. Я просто хочу напечатать твиты по мере их поступления.

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
  if(response.headers.status == 200)
    println(response.body)
} 

РЕДАКТИРОВАТЬ: я нашел это решение

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response => 
  if(response.headers.status == 200){
    response.body
      .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
      .filter(_.contains("\r\n"))
      .map(json => Try(parse(json).extract[Tweet]))
      .runForeach {
        case Success(tweet) =>
          println("-----")
          println(tweet.text)
        case Failure(e) =>
          println("-----")
          println(e.getStackTrace)
      }
  }
}
Автор: mkovacek Источник Размещён: 08.11.2019 11:17

Ответы (2)


4 плюса

Решение

Тело ответа для потокового запроса WS - Akka Streams Sourcebytes. Поскольку ответы Twitter Api разделены символами новой строки (обычно), их можно использовать Framing.delimiterдля разделения их на куски байтов, анализа фрагментов в JSON и выполнения с ними того, что вы хотите. Примерно так должно работать:

import akka.stream.scaladsl.Framing
import scala.util.{Success, Try}
import akka.util.ByteString
import play.api.libs.json.{JsSuccess, Json, Reads}
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}

case class Tweet(id: Long, text: String)
object Tweet {
  implicit val reads: Reads[Tweet] = Json.reads[Tweet]
}

def twitter = Action.async { implicit request =>
  ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
      .sign(OAuthCalculator(consumerKey, requestToken))
      .withMethod("POST")
      .stream().flatMap { response =>
    response.body
      // Split up the byte stream into delimited chunks. Note
      // that the chunks are quite big
      .via(Framing.delimiter(ByteString.fromString("\n"), 20000))
      // Parse the chunks into JSON, and then to a Tweet.
      // A better parsing strategy would be to account for all
      // the different possible responses, but here we just
      // collect those that match a Tweet.
      .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
      .collect {
        case Success(JsSuccess(tweet, _)) => tweet.text
      }
      // Print out each chunk
      .runForeach(println).map { _ =>
        Ok("done")
    }
  }
}

Примечание: чтобы материализовать поток, вам нужно ввести неявное Materializerв ваш контроллер.

Автор: Mikesname Размещён: 20.08.2016 02:29

3 плюса

вызов stream()возвращает вам Future[StreamedResponse]. Затем вам придется использовать идиомы Акка, чтобы преобразовать ByteStringкуски в этом. что-то вроде:

val stream = ws.url(url)
  .sign(OAuthCalculator(consumerKey, requestToken))
  .withMethod("POST")
  .stream()

stream flatMap { res =>
  res.body.runWith(Sink.foreach[ByteString] { bytes =>
    println(bytes.utf8String)
  })
}

обратите внимание, что я не тестировал приведенный выше код (но он основан на разделе отклика потоковой передачи https://www.playframework.com/documentation/2.5.x/ScalaWS плюс описание приемника с http: //doc.akka .io / docs / akka / 2.4.2 / scala / stream / stream -flow-and-basics.html )

Также обратите внимание, что это будет печатать каждый кусок в отдельной строке, и я не уверен, что API Twitter возвращает полные json-объекты на блок. вам может понадобиться использовать, Sink.foldесли вы хотите накапливать чанки перед их печатью.

Автор: handler Размещён: 20.08.2016 01:15
Вопросы из категории :
32x32