스파크 데이터 프레임 열에서 최대값을 가져오는 가장 좋은 방법
스파크 데이터 프레임 열에서 가장 큰 값을 얻을 수 있는 가장 좋은 방법을 찾고 있습니다.
다음 예를 생각해 보십시오.
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
이는 다음을 생성합니다.
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
제 목표는 A열에서 가장 큰 값을 찾는 것입니다(검사상 3.0).PySpark를 사용하면 다음과 같은 네 가지 방법을 생각할 수 있습니다.
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
위의 각 항목은 정답을 제공하지만 Spark 프로파일링 도구가 없기 때문에 어느 것이 최선인지 알 수 없습니다.
스파크 런타임 또는 자원 사용 측면에서 위의 방법 중 어떤 것이 가장 효율적인지 또는 위의 방법보다 더 직접적인 방법이 있는지에 대한 직관 또는 경험론의 아이디어가 있습니까?
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
답은 method3과 거의 동일하지만 method3의 "asDict()"를 제거할 수 있는 것 같습니다.
데이터 프레임의 특정 열에 대한 최대값은 다음을 사용하여 얻을 수 있습니다.
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
비고: Spark는 빅 데이터 분산 컴퓨팅에서 작업하기 위한 것입니다.예제 DataFrame의 크기는 매우 작으므로 실제 예제의 순서를 작은 예제와 관련하여 변경할 수 있습니다.
가장 느림:방법_1, 이유는.describe("A")
최소값, 최대값, 평균, 표준 편차 및 카운트를 계산합니다(전체 열에 걸쳐 5회 계산).
매체: Method_4, 왜냐하면,.rdd
(DF에서 RDD로 변환) 프로세스 속도가 느려집니다.
더 빠른 속도: Method_3 ~ Method_2 ~ Method_5. 논리가 매우 유사하기 때문에 스파크의 촉매 최적화기는 최소한의 연산으로 매우 유사한 논리를 따릅니다(특정 열의 최대값 가져오기, 단일 값 데이터 프레임 수집;.asDict()
2, 3과 5를 비교하여 약간의 추가 시간 추가)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 5: Use agg()
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
클러스터의 에지 노드에 대한 결과(밀리초(ms)
작은 DF(ms):{'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
더 큰 DF(ms):{'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
다른 방법:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
데이터에 대해 다음과 같은 벤치마크를 얻었습니다.
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
그들은 모두 같은 대답을 합니다.
다음 예에서는 스파크 데이터 프레임 열에서 최대값을 가져오는 방법을 보여 줍니다.
from pyspark.sql.functions import max
df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
| 3.0|
+------+
print result.collect()[0]['max(A)']
3.0
마찬가지로 min, mean 등은 다음과 같이 계산할 수 있습니다.
from pyspark.sql.functions import mean, min, max
result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
| 2.0| 1.0| 3.0|
+------+------+------+
먼저 가져오기 줄을 추가합니다.
from pyspark.sql.functions import min, max
데이터 프레임에서 최소 연령 값 찾기
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
데이터 프레임에서 수명의 최대값 찾기
df.agg(max("age")).show()
+--------+
|max(age)|
+--------+
| 77|
+--------+
이 체인에 이미 있는 다른 솔루션(@satpremrath)을 사용했습니다.
데이터 프레임에서 최소 연령 값 찾기
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
edit: 컨텍스트를 추가합니다.
위의 방법으로 결과를 인쇄하는 동안 나중에 재사용할 변수에 결과를 할당할 때 문제가 발생했습니다.
따라서, 오직 그것만 얻기 위해.int
변수에 할당된 값:
from pyspark.sql.functions import max, min
maxValueA = df.agg(max("A")).collect()[0][0]
maxValueB = df.agg(max("B")).collect()[0][0]
값을 얻으려면 다음 중 하나를 사용합니다.
df1.agg({"x": "max"}).collect()[0][0]
df1.agg({"x": "max"}).head()[0]
df1.agg({"x": "max"}).first()[0]
또는 '분'을 위해 이러한 작업을 수행할 수 있습니다.
from pyspark.sql.functions import min, max
df1.agg(min("id")).collect()[0][0]
df1.agg(min("id")).head()[0]
df1.agg(min("id")).first()[0]
Scala(Spark 2.0.+ 사용)를 사용하는 방법을 궁금해하는 사람이 있을 경우 다음을 수행합니다.
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
가장 좋은 해결책은 다음과 같은 것을 사용하는 것입니다.head()
당신의 예를 고려해 볼 때:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
을 사용하면 같은 을 얻을 수.: agg agg agg 방 max 법하다면음같과은값다을있니습얻.
from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]
다음과 같이 반환됩니다.3.0
가져오기가 올바른지 확인합니다.
from pyspark.sql.functions import max
여기서 사용하는 max 함수는 파이썬의 기본 max 함수가 아닌 pySPARK sql 라이브러리 함수입니다.
pyspark에서 다음을 수행할 수 있습니다.
max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
계산 통계만 수행하면 되는 게으른 방법은 다음과 같습니다.
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
또는
spark.sql("Select * from sampleStats").describe('ColName')
아니면 벌집 껍질을 열 수도 있고,
describe formatted table sampleStats;
특성(최소, 최대, 고유, 널 등)에 통계량이 표시됩니다.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
이며, (3.0,6.0) 이며, (3.0,6.0) 이며, (3.0,6.0) 이 됩니다.testDataFrame.agg(max($"A"), max($"B")).collect()(0)
. . . .testDataFrame.agg(max($"A"), max($"B")).collect()(0)
[를 합니다. [3.0,6.0]
언급URL : https://stackoverflow.com/questions/33224740/best-way-to-get-the-max-value-in-a-spark-dataframe-column
'programing' 카테고리의 다른 글
oracle "order by" 부품에서 SQL 주입 방지 (0) | 2023.07.21 |
---|---|
이진 트리를 구현하는 방법은 무엇입니까? (0) | 2023.07.21 |
테스트 사례에 사용되는 "setUp" 및 "tearDown" Python 방법 설명 (0) | 2023.07.21 |
스프링 REST 및 PATCH 방법 (0) | 2023.07.21 |
MockMvc가 항상 빈 콘텐츠()를 반환하는 이유는 무엇입니까? (0) | 2023.07.21 |