Have watched all your videos. Seriously Gold content. Requesting not to stop making videos.
@afaqueahmad7117
5 ай бұрын
Thank you @dileepkumar-nd1fo for the kind words, it means a lot to me :)
@akshayshinde3703
8 ай бұрын
Really good explanation Afaque. Thank you for making such in depth videos.😊
@sureshpatchigolla3438
6 ай бұрын
Great work brother......... Thank you for explaining concepts in detail ❤❤
@afaqueahmad7117
6 ай бұрын
Appreciate it, @sureshpatchigolla3438 :)
@abhisheknigam3768
Ай бұрын
Industry level content.
@tahiliani22
8 ай бұрын
Commenting so that you continue making such informative videos. Great work!
@anandchandrashekhar2933
4 ай бұрын
Wow! Thank you Afaque, this is incredible content and very helpful!
@afaqueahmad7117
3 ай бұрын
Appreciate it @anandchandrashekhar2933, thank you!
@Momofrayyudoodle
5 ай бұрын
Thank you so much. Keep up the good work. Looking forward for more such videos to learn Spark
@afaqueahmad7117
5 ай бұрын
Thank you @Momofrayyu-pw9ki, really appreciate it :)
@purnimasharma9734
4 ай бұрын
Very nice explanation! Thank you for making this video.
@afaqueahmad7117
4 ай бұрын
Thanks @purnimasharma9734, appreciate it :)
@2412_Sujoy_Das
8 ай бұрын
Great share sir, the optimal shuffle size..... Please bring more scenario basef Questions as well as best production based practises!!!!
@mirli33
21 күн бұрын
Currently watching your playlist one by one. Great content. Very detailed explanation. In the first scenario you had 5 executors and with 4 cores each. If you have 1500 shuffle partition how they are going to be accommodated.
@afaqueahmad7117
9 күн бұрын
Hey @mirli33, glad you're finding the playlist helpful. Regarding the question, when you have 1500 shuffle partitions and 20 cores (5 x 4 = 20). 1 core takes up 1 partition, so having a total of 20 cores, first round will process 20 partitions. There will be a total of 1500/20 = 75 rounds of processing
@rgv5966
3 ай бұрын
I don't think this kind of videos are available on Spark anywhere else. Great work Afaque!
@afaqueahmad7117
2 ай бұрын
Appreciate it @rgv5966, thank you!
@vijaykumar-b6i7t
Ай бұрын
i like very much of your videos, it's insightful. can you please make series/videos on Spark interview oriented questions. Thanks in advance
@ashokreddyavutala8684
8 ай бұрын
thanks a bunch for the great content again....
@vinothvk2711
8 ай бұрын
Great Explanation!
@Wonderscope1
8 ай бұрын
Thanks for video, very informative
@fitness_thakur
2 ай бұрын
could you please make video on stack overflow like what are scenario when it can occur and how to fix it
@afaqueahmad7117
Ай бұрын
Are you referring to OOM (out of memory errors) - Driver & Executor?
@fitness_thakur
Ай бұрын
@@afaqueahmad7117 No, basically when we have multiple layers under single session then at that time stack memory getting full so to break it we have to make sure we are using one session per layer. e.g- suppose we have 3 layers (internal, external, combined) and if you run these in single session then it will throw stackoverflow error at any place whenever its stack get overflow. We tried to increase stack as well but that was not working. Hence at the last we come up with approach like will run one layer and then close session likewise
@tanushreenagar3116
2 ай бұрын
perfect video
@Rafian1924
6 ай бұрын
Learning from masters❤❤❤
@asokanramasamy2087
6 ай бұрын
Clearly explained!!
@afaqueahmad7117
6 ай бұрын
Appreciate it :)
@mahendranarayana1744
Ай бұрын
Great explanation, Thank you, But how would we know how to configure exact (at least best) "spark.sql.shuffle.partitions" at run time? Because each run/day the volume of the data is going to be changed. So, how do we determine the data volume at run time to set the shuffle.partitions number?
@afaqueahmad7117
9 күн бұрын
Hey @mahendranarayana1744, good question! There are 2 ways to solve this problem. With the new Spark versions (>= 3.0.0), you can let AQE handle the sizing which would according split partitions if the size per partition is huge or combine (coalesce) partitions if the sizes per partitions is small Another approach to solve the problem is estimating the size of the dataset at runtime and then dividing the size by the optimal partition size (100-200 MB) so that each partition has an optimal size. I've referenced about how to estimate the size of datasets in the "Bucketing" video here: kzitem.info/news/bejne/kqGN0mqaaquWfnY You can refer to the exact algorithm on sizing datasets here: umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft
@puneetgupta003
4 ай бұрын
Nice content Afaque !
@afaqueahmad7117
4 ай бұрын
Appreciate it @puneetgupta003, thank you!
@ravikrish006
8 ай бұрын
Thank you it is very useful
@dasaratimadanagopalan-rf9ow
2 ай бұрын
Thanks for the content, really appreciate it. My understanding is AQE take care of Shuffle Partition Optimization and we don't need to manually intervene (starting spark 3) to optimize shuffle partitions. Could you clarify this please?
@iamkiri_
8 ай бұрын
Nice explanation -)
@abdulwahiddalvi7119
3 ай бұрын
@Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?
@showbhik9700
3 ай бұрын
Lovely!
@nikhillingam4630
2 ай бұрын
Consider a scenario where my first data shuffle size is 100gb then giving more shuffle partitions make sense now in the last shuffle data size is drastically reduced to 10gb according to calculations how would be to give shuffle partitions giving 1500 would benefit for the first shuffle and not for the last shuffle. How do one approach this scenario
@vikastangudu712
5 ай бұрын
Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?
@tandaibhanukiran4828
7 ай бұрын
Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future). Thanks in advance.
@afaqueahmad7117
7 ай бұрын
@tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..) If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions. However, I would stress to use this approach with caution.
@tandaibhanukiran4828
6 ай бұрын
Thank you very much.@@afaqueahmad7117
@akshaybaura
8 ай бұрын
in scenario 1, how exactly is reducing the size of partitions beneficial ? Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.
@kartikthakur6461
8 ай бұрын
I am assuming Disk computation will take much longer to complete.
@afaqueahmad7117
8 ай бұрын
Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons: 1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow. 2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores. 3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data. However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.
@crepantherx
8 ай бұрын
can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)
@leonardopetraglia6040
2 ай бұрын
Thanks for the video! I also have a question: when I execute complex query, there will be multiple stage with different shuffle write sizes, which do I have to take in consideration for the computation of the optimal number of shuffle partitions?
@afaqueahmad7117
9 күн бұрын
Hey @leonardopetraglia6040, it's best to start with the largest shuffle write across all stages
@arunkindra832
8 ай бұрын
Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions? Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide? Also, please explain a case where we don't have enough cores available in the cluster.
@afaqueahmad7117
8 ай бұрын
Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.
@JustDinesh-1934
21 күн бұрын
I have learned somewhere that the max partition size can only be 128mb in spark. Isnt that contradict to what you mentined when explaining about 300GB example? Just asking to Correct myself if wrong.
@afaqueahmad7117
9 күн бұрын
Hey @JustDinesh-1934, good question! Anything between 100-200 MB is a reasonable number for optimal performance, but it can depend on several factors, specially your cluster, number of cores, total memory and memory available per core; As long as the "execution memory" per core > partition size, things should work just fine :)
@YounisShaik-f1j
8 ай бұрын
@afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?
@AshishStudyDE
25 күн бұрын
Sir still waiting for a dedicated video for driver and executor omm in very detailed. Question on if the file is 100 gb, and can we sort it? If yes will there be data spill, basically a interview quesion for 8+ year exp
@afaqueahmad7117
9 күн бұрын
Hey @AshishStudyDE, hopefully more content coming soon; The answer to the question would depend a lot on the resource configuration i.e. no. of cores, memory, execution memory per core. If the "execution memory" per core > partition size (assuming 100gb file is divided into partitions), there shouldn't be problems processing the files If you would like to know more about "Execution memory", please refer to the Memory Management video here: kzitem.info/news/bejne/1I6Cl6Wdq4Kqppw
@tsaha100
8 ай бұрын
@Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?
@afaqueahmad7117
8 ай бұрын
Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution. If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI. You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult
@ramvel814
7 ай бұрын
Can you tell how to resolve Python worker exited unexpectedly (crashed)
@Kiedi7
7 ай бұрын
Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it
@afaqueahmad7117
7 ай бұрын
Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both. For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.
@atifiu
8 ай бұрын
@afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.
@afaqueahmad7117
8 ай бұрын
Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;
@kaushikghosh-po1ew
5 ай бұрын
i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?
@VenkatakrishnaGangavarapu
8 ай бұрын
thanks again for the topic. i worried that you stopped ... because this deapth knowledge can not even get from colleauges
@afaqueahmad7117
8 ай бұрын
@user-dx9qw3cl8w More to come! :)
@omairaparveen2001
8 ай бұрын
waiting eagerly :)
@Revnge7Fold
3 ай бұрын
I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.
@afaqueahmad7117
3 ай бұрын
You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html
@Revnge7Fold
3 ай бұрын
@@afaqueahmad7117 Awesome! Thanks for the advice! Your videos have been really helpful!
@vinothvk2711
8 ай бұрын
In Scnario 2 - Finally, we are distributing 4.2 mb for each core. In this case whats the spark.sql.shuffle.partitions . Is it 12?
@afaqueahmad7117
8 ай бұрын
Yes @vinothvk2711, it's 12. As explained, this is going to ensure 100% utilisation of the cluster.
Пікірлер: 73