@hadoopusers

Страница 135 из 182
Daniel
17.08.2018
09:58:14
завязывайте флуд

особенно касается мужика в трусах на диване

Sergioss
17.08.2018
11:25:55
парни я тут возился с портами 8020 и 9000 в опциях dfs.namenode.rpc-address // hdfs-site fs.defaultFS // core-site И странным образом заметил что пока возился значения на нодах были разные, сейчас файл лежит на одной дата ноде а еще было видмо как правильно на двух размазан, в итоге с портами разобрался, были проблемы и ошибки, но файлы как то не правильно ложаться, в логах ошибок нет.



Google
Sergioss
17.08.2018
11:26:21


Вопрос, где я накураебить мог в этот раз=) подскажите пожалуйста.

Andrey
17.08.2018
11:54:41
Привет, подскажите пожалуйста, запускаю спарк джобу под ярном в емр. постоянно падает с 137 ошибкой и # # java.lang.OutOfMemoryError: Java heap space -- собственно вопрос правильно ли я понимаю, что если бы внутри экзекьютора (контейнера не хватало памяти, он бы хотел болше чем разрешено, то это прибивалось бы с другой ошибкой типа container killer by request) а когда падает по out of heap space это всегда про то что на инстансе физически не хватает памяти для выделения под контейнер? согласно ганглии памяти больше чем достаточно

Александр
17.08.2018
11:56:09
а сколько Вы памяти выдали джобе ?

Andrey
17.08.2018
11:56:56
последний раз запускал с такими параметрами val sparkExecutorSettings = SparkExecutorSettings( sparkDriverCores = 2, sparkCoresMax = 14, sparkDriverMemoryGb = 12, sparkExecutorCores = 20, sparkExecutorMemoryGb = 40, sparkExecutorInstances = 2 )

Andrey
17.08.2018
11:59:10
а почему его ярн не прибил? в чем разница?

Stanislav
17.08.2018
11:59:30
Ради интереса, включи динамик алокейшн для джобы вместо жёстких параметров

Yaroslav
17.08.2018
11:59:35
если я ничего не путаю

Stanislav
17.08.2018
12:00:25
а почему его ярн не прибил? в чем разница?
Ты хочешь обработать больше данных, чем выделил в ярне ресурсов. Ярн - просто даёт эти ресурсы

Andrey
17.08.2018
12:00:41
ок, спасибо, ушел пробовать

Александр
17.08.2018
12:02:52
Ради интереса, включи динамик алокейшн для джобы вместо жёстких параметров
подскажите, а что гуглить чтобы прочитать про динамическое выделения heapa для java процессов ? мой гугл говорит что jvm так не умеет

Google
Александр
17.08.2018
12:04:09
джоба разве не отдельный jvm процесс ?

Sergioss
17.08.2018
12:07:12
отдельный

Andrey
17.08.2018
12:21:10
у спарка есть параметр "spark.dynamicAllocation.enabled=true", про него идет речь

походу я не там тюнил: боюсь сглазить но судя по всему драйверу памяти не хватало а не ехзекьюторам

Grigory
17.08.2018
12:21:53
ты выделяешь макс квоту жвмке

есть вероятность (почти никакая) что она все юзать не будет

или разъясни что знач динамический хип

Alexey
17.08.2018
12:26:33
драйверу же можно отдельной настройкой память проставлять. btw на emr есть ещё параметр maximizeResourceAllocation — он отдаёт под драйвер весь инстанс целиком, если true

Grigory
17.08.2018
12:27:35
Александр
17.08.2018
12:27:46
что значит бинамическое выделение хипа?
это был вопрос на ответ про динамическое выделение параметров жеж

я возможно натянул сову на глобус и подумал про хип

Renarde
17.08.2018
13:11:27
всем привет. Обратил внимание что одна из задач на спарке дает следующее поведение - очень сильно скашивает данные на одном из тасков. Например, на вход ей подаются 3-4 таблицы - из них одна главная, довольно большого объема (детальная), вторая - агрегат, и два словаря по 30/40 значений. Все входные данные читаются и делается repartition + persist(DISK_ONLY), после чего идет три джойна. На последнем стейдже приличный кусок данных (около 50% от входа) обрабатывается одним таском, остальные распределены чуть ли не по 1-2% данных на таск. Как бороться с таким поведением? Spark 2.2.1,CDH, SparkContext запущен в статике

Евгений
17.08.2018
13:24:07
GroupBy(join_keys).count().orderBy(desc("count")).limit(10) по каждой из таблиц, ищите дублирующиеся ключи (то есть count > 1), если в одном дф например 1000 записей с ключом null, а в другом 2000, то после джойна по ключу будет 2 миллиона

Grigory
17.08.2018
13:33:23
да уж; читать надо хорошо данные

Renarde
17.08.2018
13:34:53
GroupBy(join_keys).count().orderBy(desc("count")).limit(10) по каждой из таблиц, ищите дублирующиеся ключи (то есть count > 1), если в одном дф например 1000 записей с ключом null, а в другом 2000, то после джойна по ключу будет 2 миллиона
да, так и есть - в итоге огромные партиции спиллятся по 30-40 гигабайт на диск, а остальные ядра простаивают.. А есть какие-то варианты обойти такое вот?

Евгений
17.08.2018
13:35:30
А нужны вам результаты джойна дублирующих ключей?

Может сагрегировать как-то до джойна?

Google
Renarde
17.08.2018
13:38:00
в том то и дело, что да - фильтрация навешивается уже потом от другой таблицы...

Евгений
17.08.2018
13:39:15
Ну если там проблемы с одним ключом, то можно его отдельно обработать

Уже кроссджойном

Alexey
17.08.2018
14:09:21
интересный класс задач с такими джойнами. вот у нас, например, надо джойнить датасет в несколько миллиардов записей с датасетом в примерно миллион записей, и потом считать некоторые метрики на каждую пару. декартово произведение такого размера вообще никуда не влазит. мы в итоге додумались а) предварительно фильтровать записи, результат обработки которых будет заведомо бесполезен, и б) бить левый датасет на партиции. а правый датасет запаковываем в обычную мапу, и бродкастим внутрь левыйДатасет.mapPartitionsToPair(). кое-как выползли из ситуации. вообще, неприятно. как лучше сделать — хз.

sherzod
19.08.2018
13:14:33
Всем привет. Кто-нить сталкивался с проблемой JOIN-а больших таблиц с фильтром в котором участвуют обе таблицы и находил ли какое решение? Например есть, высокая (> 1млрд) таблица и поменьше, в обоих есть координаты и мы джойним по условию distance(t1.pos, t2.pos) < threshold. То есть в фильтре участвую сразу обе таблицы. В спарке насколько я понимаю сейчас нет возможности использовать UDF в который передаются поля из двух источников, советуют делать t1.crossJoin(t2).where(dist(t1.pos, t2pos) < threshold) Но декартово произведение немаленьких таблиц не влезает никуда.

Я так предполагаю что там просто не хватает оперативки для массивов во внутренней операции группировки полей по условию t1.row - array(t2 corresponding rows) Но хотелось бы лучше понять ситуацию.

Евгений
19.08.2018
14:18:32
Можно разбивать пространство на кубики и делать джойн по кубикам

Тогда не будет кросс-джойна

sherzod
19.08.2018
14:19:17
ох интересная идея..

Евгений
19.08.2018
14:19:57
Ну то есть никто не отменял того, что если dist(A, B) < D, то abs(Xa - Xb) < D

И с любой осью так

Это позволяет сильно ограничить пересечение

sherzod
19.08.2018
14:22:29
спасибо, попробую последовательно последнюю и первую идейку, классно

Евгений
19.08.2018
14:23:25
Наверняка есть куча нормальных реализаций, но вот моя корявая версия для начала) http://ceur-ws.org/Vol-1763/paper19.pdf

sherzod
19.08.2018
14:25:18
Интересно, спасибо, почитаю

Google
Евгений
19.08.2018
14:26:11
Главное, учесть, что объекты, которые рядом, могут разбежаться по разным кубикам

Мы просто дублировали их в соседние кубики, расстояние до которых было меньше заданного

sherzod
19.08.2018
14:26:57
да, получается некая рандомизация

которую наверное можно уменьшить ещё одним джойном по сдвинутым кубикам

Евгений
19.08.2018
14:28:18
Ну можно не сдвигать, а просто засунуть объект во все кубики, с которыми пересекается его окрестность

sherzod
19.08.2018
14:28:30
да, только об этом подумал

Евгений
19.08.2018
14:28:32
Это не такое большое дублирование получается

Grigory
19.08.2018
14:28:41
главное не забыть потом что дублирование может быть)

Евгений
19.08.2018
14:33:17
Не, не может

Если дублировать только один источник

А второй строго по кубикам

Grigory
19.08.2018
14:36:33
хм

понял

Sergey
20.08.2018
08:33:07
привет! поделитесь опытом: как реализовать гарантированность метки времени при вставке в Hive-таблицы? потребовалось дать сайнтистам возможность самим инсертить данные в batch layer. default-значения столбцов в Hive появились лишь в версии 3.0 сайнтисты работают с Hive через odbc-драйвер и через pyhive (и если pyhive я еще могу допилить, то пересобирать odbc-драйвер ващще неохота).

Sergey
20.08.2018
08:38:25
ок, что это меняет с точки зрения ответа на вопрос?

и да, insert into view в Hive не поддерживается

Renarde
20.08.2018
08:39:11
ну там есть функционал: sdf.write.format('orc').insertInto(...)

Sergey
20.08.2018
08:41:00
так вопрос не в том, чтобы инструкции прописать для пользователей. что следует всегда добавлять "_inserted_ts" в код вставки, а как застраховаться от того. что они не введут current_timestamp или не введут что-нибудь левое

Google
Renarde
20.08.2018
08:47:04
хм, и вправду - не простая задача. Имхо кажется что быстрый и грязный способ - разрешить им бросать запись только в какой-нибудь KafkaConsumer и оттуда системно писать в таблицу с добавлением current_timestamp

а прямую запись в таблицу залочить (хотя кажется что это немного оверкилл)

Ilia
20.08.2018
08:50:01
Можно настроить репликацию из write-only в read-only с искусственным добавлением _inserted_ts к каждой строке

Страница 135 из 182