Skip to content

Commit 390f15e

Browse files
committed
Refactor create_spark_dev function and update API request in startup.py
1 parent b1872c1 commit 390f15e

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

docker/notebook/startup.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,10 @@ def set_env():
8080

8181
class PawMarkSparkSession:
8282

83-
def __init__(self, spark_session):
83+
def __init__(self, config_json, spark_session):
8484
self._spark_session = spark_session
85+
self._config_json = config_json
8586
self.history_server_base_url = "http://localhost:18080"
86-
try:
87-
self.config_json = requests.get("http://server:5002/spark_app/config").json()
88-
self.load_config()
89-
except Exception as e:
90-
self.config_json = 'Error loading config: ' + str(e)
9187

9288
def __getattr__(self, name):
9389
return getattr(self._spark_session, name)
@@ -109,28 +105,31 @@ def _repr_html_(self):
109105
<p><strong>Spark UI:</strong> <a href="{spark_ui_link}">{spark_ui_link}</a></p>
110106
</div>
111107
"""
112-
113-
def load_config(self):
114-
for key, value in self.config_json.items():
115-
self._spark_session.conf.set(key, value)
116108

117109
def create_spark_dev():
118110
logger.info("Creating Spark session")
119-
120-
spark = PawMarkSparkSession(SparkSession.builder \
121-
.appName("PySpark Example") \
122-
.master("spark://spark-master:7077") \
123-
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
124-
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
125-
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
126-
.config("spark.eventLog.enabled", "true") \
127-
.config("spark.eventLog.dir", "/opt/data/spark-events") \
128-
.config("spark.history.fs.logDirectory", "/opt/data/spark-events") \
129-
.config("spark.sql.warehouse.dir", "/opt/data/spark-warehouse") \
130-
# .config("executor.memory", "1g") \
131-
# .config("executor.cores", "1") \
132-
# .config("spark.executor.instances", "1") \
133-
.getOrCreate())
111+
try:
112+
config_json = requests.get("http://server:5002/spark_app/config").json()
113+
except Exception as e:
114+
config_json = 'Error loading config: ' + str(e)
115+
116+
spark = PawMarkSparkSession(
117+
config_json,
118+
SparkSession.builder \
119+
.appName("PySpark Example") \
120+
.master("spark://spark-master:7077") \
121+
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
122+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
123+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
124+
.config("spark.eventLog.enabled", "true") \
125+
.config("spark.eventLog.dir", "/opt/data/spark-events") \
126+
.config("spark.history.fs.logDirectory", "/opt/data/spark-events") \
127+
.config("spark.sql.warehouse.dir", "/opt/data/spark-warehouse") \
128+
.config("executor.memory", config_json['executor.memory']) \
129+
.config("executor.cores", config_json['executor.cores']) \
130+
.config("spark.executor.instances", config_json['spark.executor.instances']) \
131+
.getOrCreate()
132+
)
134133

135134
return spark
136135

0 commit comments

Comments
 (0)