import findspark
findspark.init()
import pyspark.sql.functions as f
from pyspark.sql.functions import col, lit
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder \
.appName("reddit_nlp") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.2") \
.master('yarn') \
.getOrCreate()
Ivy Default Cache set to: /home/hadoop/.ivy2/cache The jars for the packages stored in: /home/hadoop/.ivy2/jars :: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-a4a25514-c6e1-460f-a9c9-f486a7516a31;1.0 confs: [default] found com.johnsnowlabs.nlp#spark-nlp_2.12;3.4.2 in central found com.typesafe#config;1.4.1 in central found org.rocksdb#rocksdbjni;6.5.3 in central found com.amazonaws#aws-java-sdk-bundle;1.11.603 in central found com.github.universal-automata#liblevenshtein;3.0.0 in central found com.google.code.findbugs#annotations;3.0.1 in central found net.jcip#jcip-annotations;1.0 in central found com.google.code.findbugs#jsr305;3.0.1 in central found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central found com.google.code.gson#gson;2.3 in central found it.unimi.dsi#fastutil;7.0.12 in central found org.projectlombok#lombok;1.16.8 in central found org.slf4j#slf4j-api;1.7.21 in central found com.navigamez#greex;1.0 in central found dk.brics.automaton#automaton;1.11-8 in central found org.json4s#json4s-ext_2.12;3.5.3 in central found joda-time#joda-time;2.9.5 in central found org.joda#joda-convert;1.8.1 in central found com.johnsnowlabs.nlp#tensorflow-cpu_2.12;0.3.3 in central found net.sf.trove4j#trove4j;3.0.3 in central downloading https://repo1.maven.org/maven2/com/johnsnowlabs/nlp/spark-nlp_2.12/3.4.2/spark-nlp_2.12-3.4.2.jar ... [SUCCESSFUL ] com.johnsnowlabs.nlp#spark-nlp_2.12;3.4.2!spark-nlp_2.12.jar (690ms) downloading https://repo1.maven.org/maven2/com/typesafe/config/1.4.1/config-1.4.1.jar ... [SUCCESSFUL ] com.typesafe#config;1.4.1!config.jar(bundle) (5ms) downloading https://repo1.maven.org/maven2/org/rocksdb/rocksdbjni/6.5.3/rocksdbjni-6.5.3.jar ... [SUCCESSFUL ] org.rocksdb#rocksdbjni;6.5.3!rocksdbjni.jar (200ms) downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.603/aws-java-sdk-bundle-1.11.603.jar ... [SUCCESSFUL ] com.amazonaws#aws-java-sdk-bundle;1.11.603!aws-java-sdk-bundle.jar (920ms) downloading https://repo1.maven.org/maven2/com/github/universal-automata/liblevenshtein/3.0.0/liblevenshtein-3.0.0.jar ... [SUCCESSFUL ] com.github.universal-automata#liblevenshtein;3.0.0!liblevenshtein.jar (3ms) downloading https://repo1.maven.org/maven2/com/navigamez/greex/1.0/greex-1.0.jar ... [SUCCESSFUL ] com.navigamez#greex;1.0!greex.jar (2ms) downloading https://repo1.maven.org/maven2/org/json4s/json4s-ext_2.12/3.5.3/json4s-ext_2.12-3.5.3.jar ... [SUCCESSFUL ] org.json4s#json4s-ext_2.12;3.5.3!json4s-ext_2.12.jar (3ms) downloading https://repo1.maven.org/maven2/com/johnsnowlabs/nlp/tensorflow-cpu_2.12/0.3.3/tensorflow-cpu_2.12-0.3.3.jar ... [SUCCESSFUL ] com.johnsnowlabs.nlp#tensorflow-cpu_2.12;0.3.3!tensorflow-cpu_2.12.jar (1018ms) downloading https://repo1.maven.org/maven2/net/sf/trove4j/trove4j/3.0.3/trove4j-3.0.3.jar ... [SUCCESSFUL ] net.sf.trove4j#trove4j;3.0.3!trove4j.jar (16ms) downloading https://repo1.maven.org/maven2/com/google/code/findbugs/annotations/3.0.1/annotations-3.0.1.jar ... [SUCCESSFUL ] com.google.code.findbugs#annotations;3.0.1!annotations.jar (2ms) downloading https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java-util/3.0.0-beta-3/protobuf-java-util-3.0.0-beta-3.jar ... [SUCCESSFUL ] com.google.protobuf#protobuf-java-util;3.0.0-beta-3!protobuf-java-util.jar(bundle) (2ms) downloading https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java/3.0.0-beta-3/protobuf-java-3.0.0-beta-3.jar ... [SUCCESSFUL ] com.google.protobuf#protobuf-java;3.0.0-beta-3!protobuf-java.jar(bundle) (9ms) downloading https://repo1.maven.org/maven2/it/unimi/dsi/fastutil/7.0.12/fastutil-7.0.12.jar ... [SUCCESSFUL ] it.unimi.dsi#fastutil;7.0.12!fastutil.jar (116ms) downloading https://repo1.maven.org/maven2/org/projectlombok/lombok/1.16.8/lombok-1.16.8.jar ... [SUCCESSFUL ] org.projectlombok#lombok;1.16.8!lombok.jar (11ms) downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar ... [SUCCESSFUL ] org.slf4j#slf4j-api;1.7.21!slf4j-api.jar (2ms) downloading https://repo1.maven.org/maven2/net/jcip/jcip-annotations/1.0/jcip-annotations-1.0.jar ... [SUCCESSFUL ] net.jcip#jcip-annotations;1.0!jcip-annotations.jar (2ms) downloading https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.1/jsr305-3.0.1.jar ... [SUCCESSFUL ] com.google.code.findbugs#jsr305;3.0.1!jsr305.jar (2ms) downloading https://repo1.maven.org/maven2/com/google/code/gson/gson/2.3/gson-2.3.jar ... [SUCCESSFUL ] com.google.code.gson#gson;2.3!gson.jar (4ms) downloading https://repo1.maven.org/maven2/dk/brics/automaton/automaton/1.11-8/automaton-1.11-8.jar ... [SUCCESSFUL ] dk.brics.automaton#automaton;1.11-8!automaton.jar (3ms) downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.9.5/joda-time-2.9.5.jar ... [SUCCESSFUL ] joda-time#joda-time;2.9.5!joda-time.jar (6ms) downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.8.1/joda-convert-1.8.1.jar ... [SUCCESSFUL ] org.joda#joda-convert;1.8.1!joda-convert.jar (3ms) :: resolution report :: resolve 3291ms :: artifacts dl 3029ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.603 from central in [default] com.github.universal-automata#liblevenshtein;3.0.0 from central in [default] com.google.code.findbugs#annotations;3.0.1 from central in [default] com.google.code.findbugs#jsr305;3.0.1 from central in [default] com.google.code.gson#gson;2.3 from central in [default] com.google.protobuf#protobuf-java;3.0.0-beta-3 from central in [default] com.google.protobuf#protobuf-java-util;3.0.0-beta-3 from central in [default] com.johnsnowlabs.nlp#spark-nlp_2.12;3.4.2 from central in [default] com.johnsnowlabs.nlp#tensorflow-cpu_2.12;0.3.3 from central in [default] com.navigamez#greex;1.0 from central in [default] com.typesafe#config;1.4.1 from central in [default] dk.brics.automaton#automaton;1.11-8 from central in [default] it.unimi.dsi#fastutil;7.0.12 from central in [default] joda-time#joda-time;2.9.5 from central in [default] net.jcip#jcip-annotations;1.0 from central in [default] net.sf.trove4j#trove4j;3.0.3 from central in [default] org.joda#joda-convert;1.8.1 from central in [default] org.json4s#json4s-ext_2.12;3.5.3 from central in [default] org.projectlombok#lombok;1.16.8 from central in [default] org.rocksdb#rocksdbjni;6.5.3 from central in [default] org.slf4j#slf4j-api;1.7.21 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 21 | 21 | 21 | 0 || 21 | 21 | --------------------------------------------------------------------- :: problems summary :: :::: ERRORS SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/amazonaws/aws-java-sdk-pom/1.11.603/aws-java-sdk-pom-1.11.603.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/org/sonatype/oss/oss-parent/7/oss-parent-7.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/net/jcip/jcip-annotations/1.0/jcip-annotations-1.0-javadoc.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/google/1/google-1.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-parent/3.0.0-beta-3/protobuf-parent-3.0.0-beta-3.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java-util/3.0.0-beta-3/protobuf-java-util-3.0.0-beta-3-sources.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java-util/3.0.0-beta-3/protobuf-java-util-3.0.0-beta-3-src.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java-util/3.0.0-beta-3/protobuf-java-util-3.0.0-beta-3-javadoc.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java/3.0.0-beta-3/protobuf-java-3.0.0-beta-3-sources.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java/3.0.0-beta-3/protobuf-java-3.0.0-beta-3-src.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/com/google/protobuf/protobuf-java/3.0.0-beta-3/protobuf-java-3.0.0-beta-3-javadoc.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/org/sonatype/oss/oss-parent/9/oss-parent-9.jar SERVER ERROR: Bad Gateway url=https://dl.bintray.com/spark-packages/maven/org/slf4j/slf4j-parent/1.7.21/slf4j-parent-1.7.21.jar :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS :: retrieving :: org.apache.spark#spark-submit-parent-a4a25514-c6e1-460f-a9c9-f486a7516a31 confs: [default] 21 artifacts copied, 0 already retrieved (399905kB/276ms) Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/04/16 02:03:45 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp_2.12-3.4.2.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.typesafe_config-1.4.1.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.navigamez_greex-1.0.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.json4s_json4s-ext_2.12-3.5.3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.12-0.3.3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/com.google.code.gson_gson-2.3.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/joda-time_joda-time-2.9.5.jar added multiple times to distributed cache. 22/04/16 02:03:50 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.joda_joda-convert-1.8.1.jar added multiple times to distributed cache. 22/04/16 02:03:58 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
spark
SparkSession - in-memory
df_full = spark.read.parquet('s3://yc910-labdata-3/reddit_eda')
df_full.show(5)
[Stage 2:> (0 + 1) / 1]
+---------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+ | author|author_premium| author_flair_text|author_fullname| body|collapsed|controversiality|created_utc|gilded|no_follow|quarantined|score|send_replies|total_awards_received|len_body| date| month| score_group|covid|election|economics|finance|gender_equality|racial_equality| +---------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+ | Lakis9| 0| left-left| t2_3dzc316j|We're on literall...| 0| 0| 1584130541| 0| 1| 0| 1| 1| 0| 10|2020-03-13|2020-03|low_positive| 0| 0| 0| 0| 0| 0| | LucidMetal| 0| lib-libcenter| t2_3vej5|Wait cringe isn't...| 0| 1| 1585598170| 0| 1| 0| 0| 1| 0| 14|2020-03-30|2020-03|low_positive| 0| 0| 0| 0| 0| 0| | orjelmort234| 0|authright-authright| t2_4qizezkn|The national soci...| 0| 0| 1584472377| 0| 1| 0| 2| 1| 0| 9|2020-03-17|2020-03|low_positive| 0| 0| 0| 0| 0| 0| | dumbopinion101| 0| libleft-libleft| t2_4l9e8crm|Okay well let me ...| 0| 0| 1584465986| 0| 1| 0| 1| 1| 0| 96|2020-03-17|2020-03|low_positive| 0| 0| 0| 0| 0| 0| |vinylmaster2000| 0| centrist-centrist| t2_5tw8opu7|Wouldn't the auth...| 0| 0| 1584476000| 0| 0| 0| 14| 1| 0| 27|2020-03-17|2020-03|low_positive| 0| 0| 0| 0| 0| 0| +---------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+ only showing top 5 rows
df_full.printSchema()
root |-- author: string (nullable = true) |-- author_premium: integer (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_fullname: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: integer (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: long (nullable = true) |-- gilded: long (nullable = true) |-- no_follow: integer (nullable = true) |-- quarantined: integer (nullable = true) |-- score: long (nullable = true) |-- send_replies: integer (nullable = true) |-- total_awards_received: long (nullable = true) |-- len_body: integer (nullable = true) |-- date: date (nullable = true) |-- month: string (nullable = true) |-- score_group: string (nullable = true) |-- covid: integer (nullable = true) |-- election: integer (nullable = true) |-- economics: integer (nullable = true) |-- finance: integer (nullable = true) |-- gender_equality: integer (nullable = true) |-- racial_equality: integer (nullable = true)
df_full.select("len_body").describe().show()
[Stage 36:======================================================> (55 + 2) / 57]
+-------+------------------+ |summary| len_body| +-------+------------------+ | count| 7753637| | mean| 22.15030701076153| | stddev|18.300381640558925| | min| 6| | max| 100| +-------+------------------+
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
df_small = df_full.sample(fraction=0.001, seed=502).toPandas()
plt.figure(figsize=(12,10))
pal = sns.color_palette(palette="coolwarm", n_colors=2)[1::-1]
sns.set(style = "white") ## Set a style
plt.rcParams.update({'font.family':'serif'})
plt.hist("len_body", data = df_small, bins = range(1,102,5), edgecolor="black", color=pal[1], alpha=0.5)
### Set a title, axis label, legend and rotate the xticks
plt.title(label="Figure 1. The Distribution of the Length of the Post Body \nSample Size = 7881 (0.1%)", fontsize = 20, fontweight = "bold", y = 1.01)
plt.xlabel(xlabel = "Length of the Post Body", fontsize = 17, fontweight = "bold")
plt.ylabel(ylabel = "Number of the Posts", fontsize = 17, fontweight = "bold")
plt.xticks(fontsize = 15)
plt.yticks(fontsize = 15)
plt.show()
To clean the data, a pipeline was constructed, including cleaning up non-word characters, removing stop words, stemming and lemmatizing. When running a Stemmer
, a special pattern was recognized that the character e
at the end of a word was removed, therefore, a Lemmatizer
was utilized instead of a Stemmer
, to extract the most important part of a word. And since there are some urls included in the posts, so a regex was firstly applied to remove those urls.
!/mnt/miniconda/bin/pip install sparknlp
Requirement already satisfied: sparknlp in /mnt/miniconda/lib/python3.7/site-packages (1.0.0) Requirement already satisfied: numpy in /mnt/miniconda/lib/python3.7/site-packages (from sparknlp) (1.21.2) Requirement already satisfied: spark-nlp in /mnt/miniconda/lib/python3.7/site-packages (from sparknlp) (3.4.3)
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
empty_df = spark.createDataFrame([['']]).toDF("body")
df_full = df_full.withColumn("body", f.regexp_replace("body", "[\( ](http\S+)[\) ]", " "))
documentAssembler = DocumentAssembler() \
.setInputCol('body') \
.setOutputCol('document')
cleanUpPatterns = ["[^a-zA-Z\s]+"]
documentNormalizer = DocumentNormalizer() \
.setInputCols("document") \
.setOutputCol("normalizedDocument") \
.setAction("clean") \
.setPatterns(cleanUpPatterns) \
.setReplacement(" ") \
.setPolicy("pretty_all") \
.setLowercase(True)
sentenceDetector = SentenceDetector() \
.setInputCols(["normalizedDocument"]) \
.setOutputCol("sentence")
Tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("stop_token") \
.fit(empty_df)
stopWordsCleaner = StopWordsCleaner.pretrained() \
.setInputCols("stop_token") \
.setOutputCol("stop_body") \
.setCaseSensitive(False)
stemmer = Stemmer() \
.setInputCols(["stop_body"]) \
.setOutputCol("stem_body")
lemmatizer = LemmatizerModel.pretrained() \
.setInputCols(["stop_body"]) \
.setOutputCol("lemma_body")
tokenAssembler_stem = TokenAssembler() \
.setInputCols(["sentence", "stem_body"]) \
.setOutputCol("clean_text")
tokenAssembler_lemma = TokenAssembler() \
.setInputCols(["sentence", "lemma_body"]) \
.setOutputCol("clean_text")
stopwords_en download started this may take some time. Approximate size to download 2.9 KB [ | ]stopwords_en download started this may take some time. Approximate size to download 2.9 KB Download done! Loading the resource. [OK!]
lemma_antbnc download started this may take some time. Approximate size to download 907.6 KB [ | ]lemma_antbnc download started this may take some time. Approximate size to download 907.6 KB Download done! Loading the resource.
[ / ]
[OK!]
cleanPipeline_stem = Pipeline() \
.setStages([
documentAssembler,
documentNormalizer,
sentenceDetector,
Tokenizer,
stopWordsCleaner,
stemmer,
tokenAssembler_stem])
df_clean_stem = cleanPipeline_stem.fit(df_full).transform(df_full)
df_clean_stem.select("clean_text").show(5,truncate=False)
22/04/15 20:47:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. [Stage 9:> (0 + 1) / 1]
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |clean_text | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[[document, 0, 24, liter exact point compass, [sentence -> 0], []]] | |[[document, 0, 42, wait cring isn opposit retard don make sens, [sentence -> 0], []]] | |[[document, 0, 49, nation socialist centri certerleft italian fascist, [sentence -> 0], []]] | |[[document, 0, 235, fair bit socialist thought reddit don reddit minut hear rant femin bad male rape problem fals accus horribl todai men opt pregnanc gai peopl treat person annoi word reddit brocialist don lib righter unpopularopinion alt circl jerk point, [sentence -> 0], []]]| |[[document, 0, 89, wouldn authright readi marriag lower ag limit marriag depend religion marri point hinduism, [sentence -> 0], []]] | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
cleanPipeline_lemma = Pipeline() \
.setStages([
documentAssembler,
documentNormalizer,
sentenceDetector,
Tokenizer,
stopWordsCleaner,
lemmatizer,
tokenAssembler_lemma])
df_clean_lemma = cleanPipeline_lemma.fit(df_full).transform(df_full)
df_clean_lemma.select("clean_text").show(5,truncate=False)
[Stage 10:> (0 + 1) / 1]
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |clean_text | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[[document, 0, 28, literally exact point compass, [sentence -> 0], []]] | |[[document, 0, 47, wait cringe isn opposite retarded don make sense, [sentence -> 0], []]] | |[[document, 0, 52, national socialist centris certerleft italian fascist, [sentence -> 0], []]] | |[[document, 0, 255, fair bit socialistic think reddit don reddit minute hear rant feminism bad male rape problem false accusation horrible today man opt pregnancy gay people treat personality annoy word reddit brocialist don lib righters unpopularopinion alt circle jerk point, [sentence -> 0], []]]| |[[document, 0, 90, wouldn authright ready marriage low age limit marriage depend religion marry point hinduism, [sentence -> 0], []]] | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
df_clean_lemma.printSchema()
root |-- author: string (nullable = true) |-- author_premium: integer (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_fullname: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: integer (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: long (nullable = true) |-- gilded: long (nullable = true) |-- no_follow: integer (nullable = true) |-- quarantined: integer (nullable = true) |-- score: long (nullable = true) |-- send_replies: integer (nullable = true) |-- total_awards_received: long (nullable = true) |-- len_body: integer (nullable = true) |-- date: date (nullable = true) |-- month: string (nullable = true) |-- score_group: string (nullable = true) |-- covid: integer (nullable = true) |-- election: integer (nullable = true) |-- economics: integer (nullable = true) |-- finance: integer (nullable = true) |-- gender_equality: integer (nullable = true) |-- racial_equality: integer (nullable = true) |-- document: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- normalizedDocument: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- sentence: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- stop_token: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- stop_body: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- lemma_body: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- clean_text: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false)
df_clean = df_clean_lemma.drop("normalizedDocument", "sentence", "stop_token", "stop_body")
df_clean = df_clean.withColumn("clean_text", f.explode(col("clean_text").result))
df_clean = df_clean.withColumn("word_list", col("lemma_body").result)
df_clean.show(3)
[Stage 11:> (0 + 1) / 1]
+------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+ | author|author_premium| author_flair_text|author_fullname| body|collapsed|controversiality|created_utc|gilded|no_follow|quarantined|score|send_replies|total_awards_received|len_body| date| month| score_group|covid|election|economics|finance|gender_equality|racial_equality| document| lemma_body| clean_text| word_list| +------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+ | Lakis9| 0| left-left| t2_3dzc316j|We're on literall...| 0| 0| 1584130541| 0| 1| 0| 1| 1| 0| 10|2020-03-13|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 53...|[[token, 9, 17, l...|literally exact p...|[literally, exact...| | LucidMetal| 0| lib-libcenter| t2_3vej5|Wait cringe isn't...| 0| 1| 1585598170| 0| 1| 0| 0| 1| 0| 14|2020-03-30|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 79...|[[token, 0, 3, wa...|wait cringe isn o...|[wait, cringe, is...| |orjelmort234| 0|authright-authright| t2_4qizezkn|The national soci...| 0| 0| 1584472377| 0| 1| 0| 2| 1| 0| 9|2020-03-17|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 69...|[[token, 4, 11, n...|national socialis...|[national, social...| +------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+ only showing top 3 rows
From the results of CountVectorizer and TF-IDF, the most common and important words are people
, government
, auth/lib
, right/left
. Therefore, dummy variables including government
, eco_left / eco_right / eco_centralist
, and auth / lib
are created.
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="word_list", outputCol="word_vec", minDF=100)
model = cv.fit(df_clean)
df_count_vec = model.transform(df_clean)
df_counts = df_count_vec.select("word_vec").rdd.map(lambda row: row['word_vec'].toArray()).reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
import numpy as np
vocablist = model.vocabulary
d = {'vocabList':vocablist,'counts':df_counts}
df_cv = spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys()))
df_cv.show(20)
+----------+--------+ | vocabList| counts| +----------+--------+ | people|985283.0| | don|704019.0| | make|604063.0| | base|550588.0| | leave|364049.0| | gt|347839.0| | thing|342876.0| | good|290217.0| | time|288241.0| | bad|277712.0| | fuck|272101.0| | shit|253419.0| | count|236652.0| | libleft|231883.0| | info|227439.0| |government|226895.0| | work|223978.0| | doesn|223250.0| | yeah|223220.0| | man|221568.0| +----------+--------+ only showing top 20 rows
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(numFeatures=10000, inputCol="word_list", outputCol="TF")
df_tf = hashingTF.transform(df_clean)
df_tf.select("TF").show(5, truncate=False)
df_tf.cache()
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |TF | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |(10000,[4416,4800,6491,6916],[1.0,1.0,1.0,1.0]) | |(10000,[3167,3525,4344,5056,5447,5477,6329,8024],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) | |(10000,[3535,4704,6887,7910,8200,8493],[1.0,1.0,1.0,1.0,1.0,1.0]) | |(10000,[150,383,600,1623,1698,1989,2335,2569,2619,3059,3195,3357,3519,3753,3784,4148,4191,4263,4317,4344,4416,4842,5092,5255,5885,6204,6859,7956,8133,8333,8344,8750,8877,9806,9841],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])| |(10000,[1491,3543,3549,4416,4445,6229,6557,8105,8411,8433,8791,9033],[1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0]) | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
DataFrame[author: string, author_premium: int, author_flair_text: string, author_fullname: string, body: string, collapsed: int, controversiality: bigint, created_utc: bigint, gilded: bigint, no_follow: int, quarantined: int, score: bigint, send_replies: int, total_awards_received: bigint, len_body: int, date: date, month: string, score_group: string, covid: int, election: int, economics: int, finance: int, gender_equality: int, racial_equality: int, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, lemma_body: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, clean_text: string, word_list: array<string>, TF: vector]
_IDF = IDF(inputCol="TF", outputCol="IDF")
IDF_model = _IDF.fit(df_tf)
df_idf = IDF_model.transform(df_tf)
df_tf.select("word_list","TF").show(5, truncate=False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |word_list |TF | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[literally, exact, point, compass] |(10000,[4416,4800,6491,6916],[1.0,1.0,1.0,1.0]) | |[wait, cringe, isn, opposite, retarded, don, make, sense] |(10000,[3167,3525,4344,5056,5447,5477,6329,8024],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) | |[national, socialist, centris, certerleft, italian, fascist] |(10000,[3535,4704,6887,7910,8200,8493],[1.0,1.0,1.0,1.0,1.0,1.0]) | |[fair, bit, socialistic, think, reddit, don, reddit, minute, hear, rant, feminism, bad, male, rape, problem, false, accusation, horrible, today, man, opt, pregnancy, gay, people, treat, personality, annoy, word, reddit, brocialist, don, lib, righters, unpopularopinion, alt, circle, jerk, point]|(10000,[150,383,600,1623,1698,1989,2335,2569,2619,3059,3195,3357,3519,3753,3784,4148,4191,4263,4317,4344,4416,4842,5092,5255,5885,6204,6859,7956,8133,8333,8344,8750,8877,9806,9841],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])| |[wouldn, authright, ready, marriage, low, age, limit, marriage, depend, religion, marry, point, hinduism] |(10000,[1491,3543,3549,4416,4445,6229,6557,8105,8411,8433,8791,9033],[1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0]) | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
df_idf.select("word_list","IDF").show(5, truncate=False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |word_list |IDF | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[literally, exact, point, compass] |(10000,[4416,4800,6491,6916],[3.7763417746184738,6.168686564916481,3.9764661681648374,4.394615118038536]) | |[wait, cringe, isn, opposite, retarded, don, make, sense] |(10000,[3167,3525,4344,5056,5447,5477,6329,8024],[5.154785740359509,2.642971505746815,2.4940589860798874,3.696062053965959,4.801696711506972,4.650771105072652,5.066465422447472,5.803867148936993]) | |[national, socialist, centris, certerleft, italian, fascist] |(10000,[3535,4704,6887,7910,8200,8493],[4.509470031471585,4.296500486289015,6.686955445281422,5.75074847095795,5.121863942750001,4.917007034460635]) | |[fair, bit, socialistic, think, reddit, don, reddit, minute, hear, rant, feminism, bad, male, rape, problem, false, accusation, horrible, today, man, opt, pregnancy, gay, people, treat, personality, annoy, word, reddit, brocialist, don, lib, righters, unpopularopinion, alt, circle, jerk, point]|(10000,[150,383,600,1623,1698,1989,2335,2569,2619,3059,3195,3357,3519,3753,3784,4148,4191,4263,4317,4344,4416,4842,5092,5255,5885,6204,6859,7956,8133,8333,8344,8750,8877,9806,9841],[5.284138567598665,6.469540189901528,6.341184100981741,6.649184457969705,4.350492945494792,4.962866696895858,3.8667857510980403,3.5426417201196214,6.51615139594801,5.490596403215705,8.90277063635471,5.958781173965312,4.312627402599648,9.320646404949214,5.125830782612813,6.0180017262462195,6.015249236682902,2.2564035329050056,8.161466002813793,4.988117972159775,3.7763417746184738,4.5986799011386665,6.855139005611516,7.566322499685965,4.549815256758653,13.365660415060486,9.44965940828866,3.428667891373204,5.701622174279601,7.745507618989789,5.172196703170739,5.725230853349981,7.550246971113403,5.422026948121294,3.735299195896178])| |[wouldn, authright, ready, marriage, low, age, limit, marriage, depend, religion, marry, point, hinduism] |(10000,[1491,3543,3549,4416,4445,6229,6557,8105,8411,8433,8791,9033],[5.437598222455817,3.853723840776424,4.4799371875414495,3.7763417746184738,6.019380817325664,12.003928731871937,6.487602595520851,8.035080333330189,4.783286175711428,6.244915760771949,5.320649469603727,5.264485629697272]) | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
df_small = df_clean.limit(100000)
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="word_list", outputCol="word_vec", minDF=100)
model = cv.fit(df_small)
df_count_vec = model.transform(df_small)
df_counts = df_count_vec.select("word_vec").rdd.map(lambda row: row['word_vec'].toArray()).reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
import numpy as np
vocablist = model.vocabulary
d = {'vocabList':vocablist,'counts':df_counts}
df_cv = spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys()))
df_cv.show(100)
+----------+-------+ | vocabList| counts| +----------+-------+ | people|11504.0| | don| 8827.0| | make| 7438.0| | leave| 5203.0| | base| 2264.0| | gt| 5339.0| | thing| 4258.0| | good| 3480.0| | time| 3667.0| | fuck| 3399.0| |government| 2915.0| | lib| 2985.0| | bad| 2919.0| | shit| 3005.0| | auth| 2745.0| | yeah| 2832.0| | doesn| 2915.0| | libleft| 3287.0| | flair| 2733.0| | state| 2645.0| | work| 2880.0| | libright| 2775.0| | ve| 2703.0| | man| 2192.0| | isn| 2514.0| | country| 2180.0| | authright| 2868.0| | hate| 2590.0| | point| 2219.0| | white| 2145.0| | pretty| 2225.0| | fucking| 2481.0| | post| 2828.0| | info| 46.0| | centrist| 2458.0| | call| 2540.0| | think| 2045.0| | black| 1767.0| | ll| 2315.0| | didn| 1789.0| | meme| 2189.0| | give| 1877.0| | guy| 2026.0| | count| 426.0| | gun| 1389.0| | free| 2062.0| | political| 1831.0| | agree| 1840.0| | purple| 1162.0| | mean| 1844.0| | year| 1739.0| | lot| 1765.0| | lol| 1583.0| | kill| 1569.0| | nazi| 1798.0| | back| 1481.0| | support| 1530.0| | life| 1548.0| | literally| 1697.0| | talk| 1684.0| | live| 1625.0| | aren| 1529.0| | pay| 1645.0| | money| 1842.0| | bot| 335.0| | world| 1556.0| | real| 1468.0| | feel| 1453.0| | compass| 1258.0| | put| 1510.0| | trump| 1397.0| | change| 1415.0| | comment| 1618.0| | authleft| 1539.0| | power| 1319.0| | vote| 1546.0| | tax| 1507.0| | american| 1394.0| | wrong| 1448.0| | care| 1530.0| | long| 1494.0| | love| 1459.0| | reason| 1294.0| | find| 1905.0| | win| 1594.0| | quadrant| 1251.0| | start| 1380.0| | part| 1336.0| | day| 1377.0| | stop| 1328.0| | party| 947.0| | joke| 1528.0| | big| 1260.0| | right| 1276.0| | true| 1397.0| | person| 1312.0| |understand| 1161.0| | word| 3002.0| | society| 1272.0| | reply| 227.0| +----------+-------+ only showing top 100 rows
df_clean = df_clean.withColumn("government", f.when(df_clean["clean_text"].rlike("(?i)government"), 1).otherwise(0))
df_clean = df_clean.withColumn("eco_left", f.when(df_clean["clean_text"].rlike("(?i)left"), 1).otherwise(0))
df_clean = df_clean.withColumn("eco_right", f.when(df_clean["clean_text"].rlike("(?i)right"), 1).otherwise(0))
df_clean = df_clean.withColumn("eco_centr", f.when(df_clean["clean_text"].rlike("(?i)centr"), 1).otherwise(0))
df_clean = df_clean.withColumn("auth", f.when(df_clean["clean_text"].rlike("(?i)auth"), 1).otherwise(0))
df_clean = df_clean.withColumn("lib", f.when(df_clean["clean_text"].rlike("(?i)lib"), 1).otherwise(0))
df_clean = df_clean.drop("eco_dir", "auth_lib")
df_clean.persist()
DataFrame[author: string, author_premium: int, author_flair_text: string, author_fullname: string, body: string, collapsed: int, controversiality: bigint, created_utc: bigint, gilded: bigint, no_follow: int, quarantined: int, score: bigint, send_replies: int, total_awards_received: bigint, len_body: int, date: date, month: string, score_group: string, covid: int, election: int, economics: int, finance: int, gender_equality: int, racial_equality: int, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, lemma_body: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, clean_text: string, word_list: array<string>, government: int, eco_left: int, eco_right: int, eco_centr: int, auth: int, lib: int]
df_clean.groupBy("government").count().show()
[Stage 30:=======================================================>(56 + 1) / 57]
+----------+-------+ |government| count| +----------+-------+ | 1| 193721| | 0|7555619| +----------+-------+
df_clean.groupBy("eco_left").count().show()
[Stage 45:=======================================================>(56 + 1) / 57]
+--------+-------+ |eco_left| count| +--------+-------+ | 1| 398868| | 0|7350472| +--------+-------+
df_clean.groupBy("eco_right").count().show()
[Stage 50:=======================================================>(56 + 1) / 57]
+---------+-------+ |eco_right| count| +---------+-------+ | 1| 443250| | 0|7306090| +---------+-------+
df_clean.groupBy("eco_centr").count().show()
[Stage 55:======================================================> (55 + 2) / 57]
+---------+-------+ |eco_centr| count| +---------+-------+ | 1| 174703| | 0|7574637| +---------+-------+
df_clean.groupBy("auth").count().show()
[Stage 60:======================================================> (55 + 2) / 57]
+----+-------+ |auth| count| +----+-------+ | 1| 508159| | 0|7241181| +----+-------+
df_clean.groupBy("lib").count().show()
[Stage 65:=======================================================>(56 + 1) / 57]
+---+-------+ |lib| count| +---+-------+ | 1| 760253| | 0|6989087| +---+-------+
MODEL_NAME='sentimentdl_use_twitter'
documentAssembler = DocumentAssembler()\
.setInputCol("clean_text")\
.setOutputCol("clean_document")
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
.setInputCols(["clean_document"])\
.setOutputCol("sentence_embeddings")
sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
.setInputCols(["sentence_embeddings"])\
.setOutputCol("sentiment")
nlpPipeline = Pipeline(
stages = [
documentAssembler,
use,
sentimentdl
])
tfhub_use download started this may take some time. Approximate size to download 923.7 MB [ | ]tfhub_use download started this may take some time. Approximate size to download 923.7 MB [ | ]Download done! Loading the resource. [ / ]
2022-04-15 22:50:03.448335: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. 2022-04-15 22:50:03.799256: I external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
[ \ ]
2022-04-15 22:50:08.606928: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 60236800 exceeds 10% of free system memory. 2022-04-15 22:50:08.708760: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 60236800 exceeds 10% of free system memory. 2022-04-15 22:50:08.754588: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 60236800 exceeds 10% of free system memory. 2022-04-15 22:50:08.802319: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 60236800 exceeds 10% of free system memory. 2022-04-15 22:50:08.848840: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 60236800 exceeds 10% of free system memory.
[OK!] sentimentdl_use_twitter download started this may take some time. Approximate size to download 11.4 MB [ | ]sentimentdl_use_twitter download started this may take some time. Approximate size to download 11.4 MB [ / ]Download done! Loading the resource.
[OK!]
empty_df = spark.createDataFrame([['']]).toDF("clean_text")
pipelineModel = nlpPipeline.fit(empty_df)
df_sentiment = pipelineModel.transform(df_clean)
df_sentiment.show(3)
+------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+--------+---------+---------+----+---+--------------------+--------------------+--------------------+ | author|author_premium| author_flair_text|author_fullname| body|collapsed|controversiality|created_utc|gilded|no_follow|quarantined|score|send_replies|total_awards_received|len_body| date| month| score_group|covid|election|economics|finance|gender_equality|racial_equality| document| lemma_body| clean_text| word_list|government|eco_left|eco_right|eco_centr|auth|lib| clean_document| sentence_embeddings| sentiment| +------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+--------+---------+---------+----+---+--------------------+--------------------+--------------------+ | Lakis9| 0| left-left| t2_3dzc316j|We're on literall...| 0| 0| 1584130541| 0| 1| 0| 1| 1| 0| 10|2020-03-13|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 53...|[[token, 9, 17, l...|literally exact p...|[literally, exact...| 0| 0| 0| 0| 0| 0|[[document, 0, 28...|[[sentence_embedd...|[[category, 0, 28...| | LucidMetal| 0| lib-libcenter| t2_3vej5|Wait cringe isn't...| 0| 1| 1585598170| 0| 1| 0| 0| 1| 0| 14|2020-03-30|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 79...|[[token, 0, 3, wa...|wait cringe isn o...|[wait, cringe, is...| 0| 0| 0| 0| 0| 0|[[document, 0, 47...|[[sentence_embedd...|[[category, 0, 47...| |orjelmort234| 0|authright-authright| t2_4qizezkn|The national soci...| 0| 0| 1584472377| 0| 1| 0| 2| 1| 0| 9|2020-03-17|2020-03|low_positive| 0| 0| 0| 0| 0| 0|[[document, 0, 69...|[[token, 4, 11, n...|national socialis...|[national, social...| 0| 1| 0| 1| 0| 0|[[document, 0, 52...|[[sentence_embedd...|[[category, 0, 52...| +------------+--------------+-------------------+---------------+--------------------+---------+----------------+-----------+------+---------+-----------+-----+------------+---------------------+--------+----------+-------+------------+-----+--------+---------+-------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+--------+---------+---------+----+---+--------------------+--------------------+--------------------+ only showing top 3 rows
df_sentiment=df_sentiment.withColumn("sentiment", f.explode(f.expr("sentiment.result")).alias("sentiment"))
df_sentiment.select("clean_text", "sentiment").show(5, truncate=False)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ |clean_text |sentiment| +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ |literally exact point compass |positive | |wait cringe isn opposite retarded don make sense |negative | |national socialist centris certerleft italian fascist |positive | |fair bit socialistic think reddit don reddit minute hear rant feminism bad male rape problem false accusation horrible today man opt pregnancy gay people treat personality annoy word reddit brocialist don lib righters unpopularopinion alt circle jerk point|negative | |wouldn authright ready marriage low age limit marriage depend religion marry point hinduism |positive | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ only showing top 5 rows
df_sentiment.groupBy('author_flair_text').pivot('sentiment').count().show()
+--------------------+--------+-------+--------+ | author_flair_text|negative|neutral|positive| +--------------------+--------+-------+--------+ | auth-authcenter| 232500| 30307| 475966| | authright-authright| 234551| 31332| 355756| | right-right| 232587| 29867| 356213| | left-left| 241310| 31457| 366059| | authleft-authleft| 155163| 21910| 236738| | centrist-centrist| 309765| 41290| 503516| | lib-libcenter| 333282| 44741| 542208| | libright2-libright| 104847| 14605| 167727| | | 71| 15| 181| |centrist-grandinq...| 316| 119| 7166| | libright-libright| 469588| 63979| 772199| | politicalcompass| null| null| 3| | centg-centrist| 70571| 9789| 126640| | libleft-libleft| 402088| 55334| 648092| +--------------------+--------+-------+--------+
df_sent_date = df_sentiment.groupBy("date").pivot('sentiment').count().toPandas()
df_sent_date.date = pd.to_datetime(df_sent_date.date)
df_sent_date["Negative Rate"] = df_sent_date.negative / (df_sent_date.negative + df_sent_date.neutral + df_sent_date.positive)
df_sent_date["Neutral Rate"] = df_sent_date.neutral / (df_sent_date.negative + df_sent_date.neutral + df_sent_date.positive)
df_sent_date["Positive Sentiment Rate"] = df_sent_date.positive / (df_sent_date.negative + df_sent_date.neutral + df_sent_date.positive)
df_sent_date.drop(["negative", "neutral", "positive"], axis=1, inplace=True)
df_sent_date.sort_values(by = "date", inplace=True)
df_nq = spark.read.csv('s3://yc910-labdata-3/nasdaq').toPandas()
df_nq.columns = ["date", "Nasdaq Close Price", "volume"]
df_nq.date = pd.to_datetime(df_nq.date)
df_nq["Nasdaq Close Price"] = df_nq["Nasdaq Close Price"].astype("float")
df_nq.volume = df_nq.volume.astype("int")
df_sent_date = df_sent_date.merge(df_nq, how = "left", on = "date")
## Plot the figure
plt.figure(figsize=(12,10)) ## Fig size
sns.set_style("white") ## Set a style
pal = sns.color_palette(palette="coolwarm", n_colors=2)[1::-1] ## Generate a color_palette
plt.rcParams.update({'font.family':'serif'}) ## Set a font family
### Make the plot
#ax = sns.pairplot(df_sent_date[["pos", "close", "volume"]], kind='reg', palette=pal[1])
sns.jointplot(x=df_sent_date["Positive Sentiment Rate"], y=df_sent_date["Nasdaq Close Price"], cmap="Blues", shade=True, kind='kde')
### Set title, axis labels and legend
plt.title(label = "Figure 1. Contour Plot & Marginal Distribution of \nNasdaq Index and Post Sentiment", fontsize = 20, fontweight = "bold", y = 1.25, x = -3)
plt.yticks(fontsize = 13)
plt.xticks(fontsize = 13)
plt.rc('axes', titlesize=15, labelsize=17)
plt.show()
<Figure size 864x720 with 0 Axes>
df_sent_flair = df_sentiment.groupBy("author_flair_text").pivot('sentiment').count().toPandas()
df_sent_flair = df_sent_flair[df_sent_flair.positive > 10000].reset_index(drop=True)
df_sent_flair["pos_rate"] = df_sent_flair.positive / (df_sent_flair.negative + df_sent_flair.neutral + df_sent_flair.positive)
# Bars are sorted by the cumulative track length
df_sent_flair_sorted = df_sent_flair.sort_values("pos_rate", ascending=False).reset_index(drop=True)
# Values for the x axis
ANGLES = np.linspace(0.05, 2 * np.pi - 0.05, len(df_sent_flair_sorted), endpoint=False)
# Cumulative length
LENGTHS = df_sent_flair_sorted["pos_rate"].values
# Author label
REGION = df_sent_flair_sorted["author_flair_text"].values
# Group of Positive
df_sent_flair_sorted['n'] = pd.DataFrame([1 if i<=2 else 2 if i<=5 else 3 if i<=8 else 4 for i in df_sent_flair_sorted.index])
TRACKS_N = df_sent_flair_sorted["n"].values
import matplotlib as mpl
GREY12 = "#1f1f1f"
# The minus glyph is not available in Bell MT
# This disables it, and uses a hyphen
plt.rc("axes", unicode_minus=False)
# Colors
COLORS = ["#6C5B7B","#C06C84","#F67280","#F8B195"]
# Colormap
cmap = mpl.colors.LinearSegmentedColormap.from_list("my color", COLORS, N=256)
# Normalizer
norm = mpl.colors.Normalize(vmin=TRACKS_N.min(), vmax=TRACKS_N.max())
# Normalized colors. Each number of tracks is mapped to a color in the
# color scale 'cmap'
COLORS = cmap(norm(TRACKS_N))
from textwrap import wrap
fig, ax = plt.subplots(figsize=(9, 12.6), subplot_kw={"projection": "polar"})
# Set background color to white, both axis and figure.
fig.patch.set_facecolor("white")
ax.set_facecolor("white")
ax.set_theta_offset(1.2 * np.pi / 2)
ax.set_ylim(bottom=0.5, top=0.7)
# Add geometries to the plot
# Add bars to represent the cumulative track lengths
ax.bar(x = ANGLES, height=LENGTHS-0.5, bottom=0.5,
color=COLORS, alpha=0.9, width=0.5, zorder=11)
# Add dashed vertical lines. These are just references
ax.vlines(ANGLES, 0.5, 0.7, color=GREY12, ls=(0, (4, 4)), zorder=12)
# Add labels for the regions
REGION = ["\n".join(wrap(r, 5, break_long_words=False)) for r in REGION]
#REGION
# Set the labels
ax.set_xticks(ANGLES)
ax.set_xticklabels(REGION, size=13);
ax.set_title(label = "Figure 2. Radar Plot of Positive Sentiment Rate \nfor Different Author Flair Type", fontsize = 20, fontweight = "bold")
Text(0.5, 1.0, 'Figure 2. Radar Plot of Positive Sentiment Rate \nfor Different Author Flair Type')
df_sent_score = df_sentiment.select("score", "sentiment").sample(fraction=0.005, seed=502).toPandas()
plt.figure(figsize=(12,10))
pal = [sns.color_palette(palette="coolwarm", n_colors=3)[0],
sns.color_palette(palette="coolwarm", n_colors=3)[2],
sns.color_palette(palette="coolwarm", n_colors=3)[1]]
sns.set(style = "white") ## Set a style
plt.rcParams.update({'font.family':'serif'})
sns.boxenplot(x="score", y="sentiment", palette=pal, scale="linear", data=df_sent_score, orient="h")
### Set a title, axis label, legend and rotate the xticks
plt.title(label="Figure 3. The Distribution of the Post Score of Different Sentiments", fontsize = 20, fontweight = "bold", y = 1.01)
plt.xlabel(xlabel = "Post Score", fontsize = 17, fontweight = "bold")
plt.ylabel(ylabel = "Post Sentiment", fontsize = 17, fontweight = "bold")
plt.xticks(fontsize = 15)
plt.yticks(fontsize = 15)
plt.show()
COVID
and Racial Equality
related posts tend to have a larger proportion of negative sentiment, while Economics
and Gender Equality
related posts are likely to be more positive. summary_1 = df_sentiment.groupBy("month").pivot('sentiment').count().toPandas()
summary_1["Negative Rate"] = summary_1.negative / (summary_1.negative + summary_1.neutral + summary_1.positive)
summary_1["Neutral Rate"] = summary_1.neutral / (summary_1.negative + summary_1.neutral + summary_1.positive)
summary_1["Positive Sentiment Rate"] = summary_1.positive / (summary_1.negative + summary_1.neutral + summary_1.positive)
summary_1.sort_values(by = "month", inplace=True)
summary_1.reset_index(drop = True, inplace = True)
summary_1
month | negative | neutral | positive | Negative Rate | Neutral Rate | Positive Sentiment Rate | |
---|---|---|---|---|---|---|---|
0 | 2019-07 | 2975 | 440 | 5597 | 0.330115 | 0.048824 | 0.621061 |
1 | 2019-08 | 4511 | 637 | 7821 | 0.347829 | 0.049117 | 0.603053 |
2 | 2019-09 | 5745 | 825 | 9875 | 0.349346 | 0.050167 | 0.600486 |
3 | 2019-10 | 11570 | 1844 | 21039 | 0.335820 | 0.053522 | 0.610658 |
4 | 2019-11 | 16838 | 2521 | 31686 | 0.329866 | 0.049388 | 0.620746 |
5 | 2019-12 | 29962 | 4754 | 57539 | 0.324774 | 0.051531 | 0.623695 |
6 | 2020-01 | 49450 | 7678 | 95539 | 0.323908 | 0.050292 | 0.625800 |
7 | 2020-02 | 53250 | 8385 | 104252 | 0.321002 | 0.050546 | 0.628452 |
8 | 2020-03 | 88029 | 12979 | 154286 | 0.344814 | 0.050839 | 0.604346 |
9 | 2020-04 | 130314 | 18905 | 227494 | 0.345924 | 0.050184 | 0.603892 |
10 | 2020-05 | 138654 | 19872 | 236374 | 0.351112 | 0.050322 | 0.598567 |
11 | 2020-06 | 162019 | 23114 | 276784 | 0.350753 | 0.050039 | 0.599207 |
12 | 2020-07 | 181637 | 24847 | 310148 | 0.351579 | 0.048094 | 0.600327 |
13 | 2020-08 | 193471 | 26714 | 319155 | 0.358718 | 0.049531 | 0.591751 |
14 | 2020-09 | 169329 | 23323 | 280514 | 0.357864 | 0.049291 | 0.592845 |
15 | 2020-10 | 188725 | 24286 | 303022 | 0.365723 | 0.047063 | 0.587214 |
16 | 2020-11 | 196873 | 26109 | 316190 | 0.365140 | 0.048424 | 0.586436 |
17 | 2020-12 | 186062 | 24157 | 300431 | 0.364363 | 0.047306 | 0.588331 |
18 | 2021-01 | 212037 | 27106 | 321656 | 0.378098 | 0.048335 | 0.573567 |
19 | 2021-02 | 150459 | 19294 | 233712 | 0.372917 | 0.047821 | 0.579262 |
20 | 2021-03 | 157980 | 20523 | 251985 | 0.366979 | 0.047674 | 0.585347 |
21 | 2021-04 | 152552 | 18954 | 230276 | 0.379688 | 0.047175 | 0.573137 |
22 | 2021-05 | 166195 | 19663 | 243809 | 0.386800 | 0.045763 | 0.567437 |
23 | 2021-06 | 138002 | 17816 | 219279 | 0.367910 | 0.047497 | 0.584593 |
summary_2 = df_sentiment.groupBy("sentiment").agg(f.sum("covid"),
f.sum("election"),
f.sum("economics"),
f.sum("finance"),
f.sum("gender_equality"),
f.sum("racial_equality")).toPandas()
summary_2 = summary_2.rename(columns = {"sum(covid)":"covid",
"sum(election)":"election",
"sum(economics)":"economics",
"sum(finance)":"finance",
"sum(gender_equality)":"gender_equality",
"sum(racial_equality)":"racial_equality"})
summary_2
sentiment | covid | election | economics | finance | gender_equality | racial_equality | |
---|---|---|---|---|---|---|---|
0 | positive | 12548 | 20562 | 62773 | 13928 | 43667 | 98744 |
1 | neutral | 1813 | 2516 | 4979 | 1517 | 2021 | 15071 |
2 | negative | 20020 | 21120 | 52316 | 13579 | 14229 | 157961 |
df_sentiment.write.parquet("s3://yc910-labdata-3/reddit_nlp", mode="overwrite")
spark.stop()