@hadoopusers

Страница 33 из 182
Iaroslav
31.08.2017
18:37:21
а кто-нибудь пробовал Spark streaming юзать с интервалом в пару сотень миллисекунд? Это вообще жизнеспособно?

Grigory
31.08.2017
18:37:38
зависит от того что ты кормишь ей

и как кормишь*

Iaroslav
31.08.2017
18:57:51
ну если это structured streaming из кафки в jdbc

Google
Iaroslav
31.08.2017
18:57:59
с парой сотней колонок

Daniel
31.08.2017
19:02:20
просто переложить надо?

Iaroslav
31.08.2017
19:10:12
просто переложить надо?
не. там 1 к 1 трансформации + зааплаить модель, что по сути тоже 1 в 1 трансформация. То есть кафка стрим -> пайплайн линейный -> jdbc

Daniel
31.08.2017
19:12:09
кафка коннектор заюзать и не тащить спарк

трансформации же в итоге в кафка стримс, коннектором сложить по jdbc

Iaroslav
31.08.2017
19:14:47
спарк здесь принципиально нужен

так как весь ETL построен на DataFrame'ах уже

Nick
31.08.2017
19:56:19
ну если это structured streaming из кафки в jdbc
а что за база? и как она живет?

Iaroslav
31.08.2017
20:00:02
а что за база? и как она живет?
проблема в том, что может быть что угодно. Хоть редшифт, хоть постгресс. Какой драйвер подсунут - то и будет. То есть я понимаю что итоговый сроупут от этого очень зависит, но я пока пытаюсь понять какой теоретический сроупут вообще можно из этого выжать. Задача предоставить такой near real time, что прям вот near near. При этом 100 миллисекунд латенси - вроде как допустима, по крайней мере на это число ориентир

Daniel
01.09.2017
03:58:58
если все прибито к спарку, то конечно стоит попробовать с ним такие задержки для него могут иметь заметные оверхеды, большую зависимость от сети, но возможны. в случае излишней головной боли, имхо, надо использовать другой движок

Oleksandr
01.09.2017
12:42:08
и снова по apache phoenix — кто-то юзал его batch upsert ?

Dmitry
04.09.2017
09:32:08
насущный вопрос - есть long-running application в спарке запущенном в cluster-mode на Amazon EMR. Как бы его получше мониторить?

Google
Dmitry
04.09.2017
09:32:24
т.е. рестарты вроде кластер сам обеспечит

а вот если после рестартов, приложение упало перманентно. кто-то занимался таким оповещением?

KrivdaTheTriewe
04.09.2017
09:37:42
стриминг ?

Daniel
04.09.2017
10:05:43
у нас админ на баше скрипт сделал, который дергает апи ярна и перезапускает джоб в случае падения до недавних пор хватало сейчас есть проблема, что достаточно часто более приоритетная очередь вытесняет наши стримы и без нормального мониторинга за этим уже не уследить

Andrey
04.09.2017
10:06:21
насущный вопрос - есть long-running application в спарке запущенном в cluster-mode на Amazon EMR. Как бы его получше мониторить?
у приложения есть свой ID (его можно получить через yarn application -list) Дальше уже вариантов много - можно скриптами проверять, можно через API YARN всю информацию о приложении получать

KrivdaTheTriewe
04.09.2017
10:08:03
если амбари есть, то там есть свой мониторинг

но у вас EMR

Artem
04.09.2017
10:08:48
ну есть MR каунтеры, их можно мониторить в тч их изменение - будет понятно висит job-а или нет

Dmitry
04.09.2017
11:20:00
удивлен что надо самому что-то придумывать (

Artem
04.09.2017
11:21:32
Беда, но иногда в разработке такое случается, что поделать ?

KrivdaTheTriewe
04.09.2017
11:30:00
мне кажется проще делать метрики прям в джобе)

Artem
04.09.2017
11:43:42
тут вопрос куда писать эти метрики, mr job-то распределенный. counter-ы это по сути и есть агрегация данных с каждого маппера-редусера

KrivdaTheTriewe
04.09.2017
11:44:19
ну опентсдб

Andrey
04.09.2017
13:34:09
в org.elasticsearch.spark.sql параметр spark.es.http.timeout работает у кого нибудь?

что 5секунд его ставь, что 10 минут, таймаут происходит ровно через 2 минуты

Ilia
05.09.2017
13:33:25
Всем привет, есть вопросик, возможно нубский, насчёт обработки данных спарком. Есть метрики записанные в csv формата (metric_id, object_id, timestamp, value). Сочетание (metric_id, object_id, timestamp) можно считать уникальным. Надо преобразовать csv таким образом чтобы в конечном итоге получился dataframe - (object_id, timestamp, <metric_0>, <metric_1>, ..., <metric_N>), где названия колонок metric_* известны заранее и совпадают со значениями из metric_id, а значения в этих колонках берутся из value. Хотелось бы собирать такой датафрейм в один проход по csv, например, создавать RDD такой же структуры как csv и потом делать filter -> map -> groupBy(object_id, timestamp) -> sum(*metrics). Это работает если значения численные - во время map пишу ноль во все метрики кроме данной и потом каждое значение складывается с нолями в groupBy -> sum. Но это не поможет с текстовыми значениями и вообще выглядит костыльно. Может есть более правильный способ решения такой задачи?

Alexander
05.09.2017
13:47:15
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html - не оно?

Ilia
05.09.2017
14:04:15
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html - не оно?
Близко, да, но там всё равно в итоге надо агрегацию делать по значениям. Сейчас пробую groupByKey() использовать для rdd вида ((object_id, timestamp), (metric_id, value)). Там можно любую функцию агрегации взять, в моём случае достаточно записывать в один словарь. Надеюсь, оно так сработает.

Nick
05.09.2017
14:10:37
https://www.humblebundle.com/books/data-science-books

Google
Nick
05.09.2017
14:10:43
надо брать

Dmitry
05.09.2017
14:11:27
найс

Andrey
05.09.2017
14:39:07
Всем привет, есть вопросик, возможно нубский, насчёт обработки данных спарком. Есть метрики записанные в csv формата (metric_id, object_id, timestamp, value). Сочетание (metric_id, object_id, timestamp) можно считать уникальным. Надо преобразовать csv таким образом чтобы в конечном итоге получился dataframe - (object_id, timestamp, <metric_0>, <metric_1>, ..., <metric_N>), где названия колонок metric_* известны заранее и совпадают со значениями из metric_id, а значения в этих колонках берутся из value. Хотелось бы собирать такой датафрейм в один проход по csv, например, создавать RDD такой же структуры как csv и потом делать filter -> map -> groupBy(object_id, timestamp) -> sum(*metrics). Это работает если значения численные - во время map пишу ноль во все метрики кроме данной и потом каждое значение складывается с нолями в groupBy -> sum. Но это не поможет с текстовыми значениями и вообще выглядит костыльно. Может есть более правильный способ решения такой задачи?
Если известны заранее, то можно разделить датафрейм на несколько датафреймов с разными метриками и поджойнить их по ключу object_id. В целом это обработка мультилайна получается

если я все правильно понял

Первый dataframe будет val objects = df.select('object_id, 'timestamp).distinct

дальше будет несколько df с метриками типа val metric0 = df.filter('metric_id === x).select('object_id, 'value.alias("metric_0"))

потом джойн val joined = objects.join(metric0, Seq("object_id"))

так на выходе получишь таблицу, в которой уникальный object_id будет только в одной строчке и все твои метрики будут в ней же

Andrey
05.09.2017
14:46:38
там еще value надо добавить

Ilia
05.09.2017
14:46:51
А, ну да

Andrey
05.09.2017
14:46:52
а во втором тебе timestamp не нужен

тк он в objects будет

Ilia
05.09.2017
14:47:16
Ага, понял

Andrey
05.09.2017
14:48:01
ну это если у тебя timestamp один. Если он не один, то надо понимать, какой ты хочешь врезультирущей таблице

Ilia
05.09.2017
16:26:11
ну это если у тебя timestamp один. Если он не один, то надо понимать, какой ты хочешь врезультирущей таблице
Таймстамп один на несколько метрик и много для одного объекта. Повторные метрики с тем же таймстампом можно отбрасывать.

Andrey
05.09.2017
16:26:49
тогда его можно в objects

а в остальных df отбрасывать

Ilia
05.09.2017
16:27:35
Вообще вроде разобрался, спасибо. Значит, делать много датафреймов и джойнить их это норм? С точки зрения производительности

Andrey
05.09.2017
16:27:49
они большие?

Google
Ilia
05.09.2017
16:29:02
Ну, да, там на самом деле не один csv а много просто все имеют одинаковую структуру

Andrey
05.09.2017
16:29:45
а в количестве событий на джобу есть оценка?

Ilia
05.09.2017
16:30:21
Нет, а как можно посчитать?

Страница 33 из 182