
Daniel
17.08.2018
09:58:14
завязывайте флуд
особенно касается мужика в трусах на диване

Sergioss
17.08.2018
11:25:55
парни я тут возился с портами 8020 и 9000
в опциях
dfs.namenode.rpc-address // hdfs-site
fs.defaultFS // core-site
И странным образом заметил что пока возился значения на нодах были разные, сейчас файл лежит на одной дата ноде а еще было видмо как правильно на двух размазан, в итоге с портами разобрался, были проблемы и ошибки, но файлы как то не правильно ложаться, в логах ошибок нет.

Google

Sergioss
17.08.2018
11:26:21
Вопрос, где я накураебить мог в этот раз=)
подскажите пожалуйста.

Andrey
17.08.2018
11:54:41
Привет, подскажите пожалуйста, запускаю спарк джобу под ярном в емр. постоянно падает с 137 ошибкой и #
# java.lang.OutOfMemoryError: Java heap space
-- собственно вопрос правильно ли я понимаю, что если бы внутри экзекьютора (контейнера не хватало памяти, он бы хотел болше чем разрешено, то это прибивалось бы с другой ошибкой типа container killer by request) а когда падает по out of heap space это всегда про то что на инстансе физически не хватает памяти для выделения под контейнер? согласно ганглии памяти больше чем достаточно

Александр
17.08.2018
11:56:09
а сколько Вы памяти выдали джобе ?

Andrey
17.08.2018
11:56:56
последний раз запускал с такими параметрами val sparkExecutorSettings = SparkExecutorSettings(
sparkDriverCores = 2,
sparkCoresMax = 14,
sparkDriverMemoryGb = 12,
sparkExecutorCores = 20,
sparkExecutorMemoryGb = 40,
sparkExecutorInstances = 2
)

Stanislav
17.08.2018
11:58:22

Andrey
17.08.2018
11:59:10
а почему его ярн не прибил? в чем разница?

Yaroslav
17.08.2018
11:59:25

Stanislav
17.08.2018
11:59:30
Ради интереса, включи динамик алокейшн для джобы вместо жёстких параметров

Yaroslav
17.08.2018
11:59:35
если я ничего не путаю

Stanislav
17.08.2018
12:00:25

Andrey
17.08.2018
12:00:41
ок, спасибо, ушел пробовать

Александр
17.08.2018
12:02:52

Google

Stanislav
17.08.2018
12:03:36

Александр
17.08.2018
12:04:09
джоба разве не отдельный jvm процесс ?

Sergioss
17.08.2018
12:07:12
отдельный

Andrey
17.08.2018
12:21:10
у спарка есть параметр "spark.dynamicAllocation.enabled=true", про него идет речь
походу я не там тюнил: боюсь сглазить но судя по всему драйверу памяти не хватало а не ехзекьюторам

Grigory
17.08.2018
12:21:53
ты выделяешь макс квоту жвмке
есть вероятность (почти никакая) что она все юзать не будет
или разъясни что знач динамический хип

Alexey
17.08.2018
12:26:33
драйверу же можно отдельной настройкой память проставлять.
btw на emr есть ещё параметр maximizeResourceAllocation — он отдаёт под драйвер весь инстанс целиком, если true

Grigory
17.08.2018
12:27:35

Александр
17.08.2018
12:27:46
я возможно натянул сову на глобус и подумал про хип

Stanislav
17.08.2018
12:42:55

Renarde
17.08.2018
13:11:27
всем привет.
Обратил внимание что одна из задач на спарке дает следующее поведение - очень сильно скашивает данные на одном из тасков.
Например, на вход ей подаются 3-4 таблицы - из них одна главная, довольно большого объема (детальная), вторая - агрегат, и два словаря по 30/40 значений.
Все входные данные читаются и делается repartition + persist(DISK_ONLY), после чего идет три джойна.
На последнем стейдже приличный кусок данных (около 50% от входа) обрабатывается одним таском, остальные распределены чуть ли не по 1-2% данных на таск.
Как бороться с таким поведением? Spark 2.2.1,CDH, SparkContext запущен в статике

Евгений
17.08.2018
13:24:07
GroupBy(join_keys).count().orderBy(desc("count")).limit(10) по каждой из таблиц, ищите дублирующиеся ключи (то есть count > 1), если в одном дф например 1000 записей с ключом null, а в другом 2000, то после джойна по ключу будет 2 миллиона

Grigory
17.08.2018
13:33:23
да уж; читать надо хорошо данные

Renarde
17.08.2018
13:34:53

Евгений
17.08.2018
13:35:30
А нужны вам результаты джойна дублирующих ключей?
Может сагрегировать как-то до джойна?

Google

Renarde
17.08.2018
13:38:00
в том то и дело, что да - фильтрация навешивается уже потом от другой таблицы...

Евгений
17.08.2018
13:39:15
Ну если там проблемы с одним ключом, то можно его отдельно обработать
Уже кроссджойном

Alexey
17.08.2018
14:09:21
интересный класс задач с такими джойнами.
вот у нас, например, надо джойнить датасет в несколько миллиардов записей с датасетом в примерно миллион записей, и потом считать некоторые метрики на каждую пару.
декартово произведение такого размера вообще никуда не влазит.
мы в итоге додумались а) предварительно фильтровать записи, результат обработки которых будет заведомо бесполезен, и б) бить левый датасет на партиции. а правый датасет запаковываем в обычную мапу, и бродкастим внутрь левыйДатасет.mapPartitionsToPair(). кое-как выползли из ситуации.
вообще, неприятно. как лучше сделать — хз.

Andrey
17.08.2018
14:54:36
если в кратце, то бродкастить и фильтровать где можно

Sergioss
17.08.2018
16:24:30

Рамиль
17.08.2018
16:59:10


sherzod
19.08.2018
13:14:33
Всем привет. Кто-нить сталкивался с проблемой JOIN-а больших таблиц с фильтром в котором участвуют обе таблицы и находил ли какое решение?
Например есть, высокая (> 1млрд) таблица и поменьше, в обоих есть координаты и мы джойним по условию distance(t1.pos, t2.pos) < threshold. То есть в фильтре участвую сразу обе таблицы.
В спарке насколько я понимаю сейчас нет возможности использовать UDF в который передаются поля из двух источников, советуют делать
t1.crossJoin(t2).where(dist(t1.pos, t2pos) < threshold)
Но декартово произведение немаленьких таблиц не влезает никуда.
Я так предполагаю что там просто не хватает оперативки для массивов во внутренней операции группировки полей по условию
t1.row - array(t2 corresponding rows)
Но хотелось бы лучше понять ситуацию.

Евгений
19.08.2018
14:18:32
Можно разбивать пространство на кубики и делать джойн по кубикам
Тогда не будет кросс-джойна

sherzod
19.08.2018
14:19:17
ох интересная идея..

Евгений
19.08.2018
14:19:57
Ну то есть никто не отменял того, что если dist(A, B) < D, то abs(Xa - Xb) < D
И с любой осью так
Это позволяет сильно ограничить пересечение

sherzod
19.08.2018
14:22:29
спасибо, попробую последовательно последнюю и первую идейку, классно

Евгений
19.08.2018
14:23:25
Наверняка есть куча нормальных реализаций, но вот моя корявая версия для начала)
http://ceur-ws.org/Vol-1763/paper19.pdf

sherzod
19.08.2018
14:25:18
Интересно, спасибо, почитаю

Google

Евгений
19.08.2018
14:26:11
Главное, учесть, что объекты, которые рядом, могут разбежаться по разным кубикам
Мы просто дублировали их в соседние кубики, расстояние до которых было меньше заданного

sherzod
19.08.2018
14:26:57
да, получается некая рандомизация
которую наверное можно уменьшить ещё одним джойном по сдвинутым кубикам

Евгений
19.08.2018
14:28:18
Ну можно не сдвигать, а просто засунуть объект во все кубики, с которыми пересекается его окрестность

sherzod
19.08.2018
14:28:30
да, только об этом подумал

Евгений
19.08.2018
14:28:32
Это не такое большое дублирование получается

Grigory
19.08.2018
14:28:41
главное не забыть потом что дублирование может быть)

Евгений
19.08.2018
14:33:17
Не, не может
Если дублировать только один источник
А второй строго по кубикам

Grigory
19.08.2018
14:36:33
хм
понял

Sergey
20.08.2018
08:33:07
привет! поделитесь опытом: как реализовать гарантированность метки времени при вставке в Hive-таблицы? потребовалось дать сайнтистам возможность самим инсертить данные в batch layer.
default-значения столбцов в Hive появились лишь в версии 3.0
сайнтисты работают с Hive через odbc-драйвер и через pyhive (и если pyhive я еще могу допилить, то пересобирать odbc-драйвер ващще неохота).

Renarde
20.08.2018
08:37:51

Sergey
20.08.2018
08:38:25
ок, что это меняет с точки зрения ответа на вопрос?
и да, insert into view в Hive не поддерживается

Renarde
20.08.2018
08:39:11
ну там есть функционал:
sdf.write.format('orc').insertInto(...)

Sergey
20.08.2018
08:41:00
так вопрос не в том, чтобы инструкции прописать для пользователей. что следует всегда добавлять "_inserted_ts" в код вставки, а как застраховаться от того. что они не введут current_timestamp или не введут что-нибудь левое

Google

Renarde
20.08.2018
08:47:04
хм, и вправду - не простая задача.
Имхо кажется что быстрый и грязный способ - разрешить им бросать запись только в какой-нибудь KafkaConsumer и оттуда системно писать в таблицу с добавлением current_timestamp
а прямую запись в таблицу залочить (хотя кажется что это немного оверкилл)

Ilia
20.08.2018
08:50:01
Можно настроить репликацию из write-only в read-only с искусственным добавлением _inserted_ts к каждой строке