טעינת נתונים מ- SQL Server ל- Elasticsearch
Elasticsearch, למי שלא מכיר, הוא מנוע אינדוקס פופולרי מאד המבוסס על מנוע האינדוקס של Lucene. מדובר למעשה ב- Document DB עם דגש חזק מאד על חיפוש טקסטואלי במידע (למרות שככל שעובר הזמן מתווספים לו פיצ’רים של document db גנרי, בכלל זה גם אגרגציות וכו’).
בפוסט הזה אני אסביר איך אפשר לדאוג לזרימת נתונים שוטפת מ- SQL Server ל- Elasticsearch.
למה שנרצה דבר כזה?
כמובן, שאם אתם משתמשים ב- Elasticsearch בתור מנוע עיקרי לאכסון ולתשאול המידע שלכם, ופה ושם מחזיקים גם אפליקציה מעל SQL Server (או כל DB רלציוני אחר), הצורך בכזה פיתרון ברור – אתם רוצים כנראה שבסוף כל המידע יהיה איפה שאתם מתשאלים אותו.
אבל, גם אם אתם לא משתמשים בהווה ב- Elasticsearch, כדאי להיות מודעים לפחות לאפשרויות שהוא נותן, בדגש על אינדוקס טקסטואלי מעולה. כיום, הפיתרון כאשר נדרש אינדוקס טקסטואלי ב- SQL Server הוא Full Text Search (בקיצור, FTS). אין ספק ש- FTS הוא פיתרון קל להטמעה ושהוא משולב טוב מאד עם התשתית של SQL Server (אותו storage, כלול בגיבויים שלכם, נתמך ב-AlwysOn וכו’). אבל, FTS יכול לא להספיק. למשל –כאשר מדובר בכמויות מידע גדולות, ואתם צריכים יכולות scale-out לפיתרון האינדוקס,. או לחלופין, אם אתם צריכים שליטה טובה יותר בצורה שבה המידע מתאנדקס ונשמר מהשליטה המוגבלת יחסית שמתאפשרת עם FTS.
המטרה ודרישות הקדם
המטרה שלנו בפוסט הזה, היא בהינתן DB מבוסס SQL Server, שמידע נכנס (ו/או מתעדכן) בטבלאות שבו, להזרים את המידע הזה ל- Elasticsearch שלנו.
השיטה שבה נעשה את זה מבוססת על הרצת שאילתות מחזורית מול ה- DB, שמביאה כל פעם את המידע שהתווסף/השתנה. לשיטה הזאת של crawling יש יתרונות וחסרונות, שכתבתי עליהם לא מעט בפוסט שעסק בטכניקות לבצע מעקב אחרי מידע שהשתנה ב- SQL Server. על בסיס התשתית שאני אראה אפשר להשתמש גם בשיטות אחרות לחילוץ מידע, כמו אלה שמוסברות בפוסט הנ”ל אבל בדוגמאות אני אתמקד בשיטה הפשוטה ביותר של ביצוע crawling על ה- data (ניתן היה להשתמש גם ב- CDC ובדברים נוספים על בסיס אותה התשתית).
הכלי שבו נשתמש כדי להריץ את השאילתות באופן מחזורי ולהכניס אותם ל- Elasticsearch נקרא Logstash. מדובר למעשה בכלי ETL פשוט, מבית היוצר של המפתחים של Elasticsearch, שמאפשר להגדיר מספר inputs – מקורות שמהם מגיע ה- data, את הטרנספורמציות שהמידע יעבור ולאן המידע יוכנס בסוף (במקרה שלנו, Elasticsearch).
נקודת המוצא של הפוסט הזה היא שכבר יש לכם cluster של Elasticsearch שאליו יגיע המידע ו- instance של Logstash שאליו נוסיף את הקונפיגורציה שלנו. רצוי גם שיהיה Kibana (ממשק ויזואלי לתחקור המידע). ההתקנה לא מורכבת, ואפשר למצוא הוראות התקנה לכל הרכיבים פה (מבין הדברים נשתמש בפועל רק ב- Elasticsearch ו- Logstash)
הורדת JDBC Driver ל- SQL Server
Logstash זאת אפליקציה מבוססת Java, כאשר ה- plugin ל- Logstash שבו נשתמש צריך את ה- JDBC Driver המתאים ל- SQL Server. לצורך כך, נוריד את ה-package שכולל את ה- JDBC Driver, ונחלץ את ה- JAR שאנחנו צריכים מה- tar.gz. הוא נמצא במיקום enu\jre8\sqljdbc42.jar. נעתיק אותו ונשמור אותו במקום ידוע בשרת שלנו שמריץ את ה- Logstash.
דוגמא 1: הרצת שאילתה מחזורית שמביאה רק רשומות חדשות
לעיתים, יש לנו טבלאות שבהן אין עדכונים ומחיקות, אלא רק מתווספים ערכים חדשים. למשל, נדמיין טבלה שמכילה מידע על היסטוריית ההדפסות בארגון. ה- clustered index של הטבלה הוא על ה- ID המספרי הרץ (ולפיו נעשה את ה- crawling בשאילתה). כך נראית הטבלה:
CREATE TABLE [dbo].[PrintJobsHistory](
[ID] [int] IDENTITY(1,1) NOT NULL,
[SentDateTime] [datetime2](7) NOT NULL,
[DocumentName] [nvarchar](150) NOT NULL,
[Username] [nvarchar](150) NOT NULL,
[NumPages] [int] NOT NULL,
[NumCopies] [int] NOT NULL,
[PrinterName] [varchar](150) NOT NULL,
[ClientComputerName] [nvarchar](150) NOT NULL,
[ClientIP] [varchar](50) NOT NULL,
CONSTRAINT [PK_PrintJobsHistory] PRIMARY KEY CLUSTERED
(
[ID] ASCמה
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
השליפה שנרצה שתרוץ היא שליפה פשוטה, שמביאה את כל העמודות בטבלה החל מה- ID האחרון שנשלף. כלכומר, נרצה ש- logstash ישמור כל פעם בצד קובץ state שמכיל את ה- ID האחרון, ויכניס אותו כפרמטר לשאילתה שמורצת כל פעם – כדי להביא לנו את כל הרשומות החדשות.
מבחינת טרנספורמציות, לא נרצה לבצע פה אף טרנספורמציה מיוחדת. הדבר היחיד שנרצה, הוא לוודא שהשדה של ה- @timestamp (שדה שמייצג את זמן האירוע) ימופה נכון, לזמן הדפסה, שהוא הערך בעמודת SentDateTime. בערך הזה נשתמש כדי לעשות “partitioning” לאינדקסים לפי ימים (מה שיאפשר תחזוקה נוחה יותר).
(כהערת אגב, אפשר היה לעשות פה גם משהו יותר מתוחכם, כמו למשל להגדיר ש- ClientIP יישמר כ- data type של כתובת IP ב- Elasticsearch עצמו. אולם, זה היה דורש לשנות את ה- mappings ב- Elasticsearch ואני מעדיף לא להיכנס לזה כחלק מהפוסט הנ”ל)
בואו נראה איך נראית קונפיגורציית ה- Logstash שמאפשרת את מה שאנחנו רוצים:
input {
jdbc {
jdbc_driver_library => "/tmp/jdbc/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.50.1:1433;databasename=Sample"
jdbc_user => "testuser"
jdbc_password => "testpass"
schedule => "*/1 * * * *"
statement => "
SELECT ID
,[@timestamp] = CONVERT(datetime, SentDateTime, 21)
,[SentDateTime]
,[DocumentName]
,[Username]
,[NumPages]
,[NumCopies]
,[PrinterName]
,[ClientComputerName]
,[ClientIP]
FROM [Sample].[dbo].[PrintJobsHistory]
WHERE ID > :sql_last_value
"
tracking_column => "ID"
use_column_value => "true"
last_run_metadata_path => "/tmp/logstash_printer_last_id.state"
lowercase_column_names => false
type => "printHistory"
}
}
output {
if [type] == "printHistory" {
elasticsearch {
hosts => ["127.0.0.1"]
index => "printjobs-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
}
נראה קצת גדול, אז בואו נפרק את זה רגע לחלקים.
יש לנו שני חלקים מרכזיים פה: ה- input, שכולל את כל ה- data sources שלנו וה- output שכולל את המקומות שבהם אנחנו רוצים לשמור את המידע. למשל, בדוגמא הזאת אין לנו filter (שהוא חלק אפשרי נוסף) שמגדיר טרנספורמציות ותנאים על המידע.
ה- input
בקונפיגורציה הזאת יש לנו input יחיד, שמבוסס על ה- input plugin שנקרא jdbc. בתוכו מופיעות ההגדרות השונות (להסבר על כל הפרמטרים שאפשר להגדיר ניתן לקרוא בתיעוד הרשמי):
- jdbc_driver_library: הנתיב לקובץ JAR של ה- JDBC Driver ל- SQL Server שהורדנו ושמרנו מקודם
- jdbc_driver_class: השם של המימוש ב- JAR ל- JDBC Driver. ספיציפית, תתייחסו לערך הזה כקבוע במקרה שאתם עובדים עם SQL Server.
- jdbc_connection_string: ה- connection string ל- DB, בסינטקס של JDBC. במקרה שלנו – התחברות לשרת בכתובת ה- IP של 192.168.50.1, ל- DB בשם Sample
- jdbc_user: שם המשתמש שאיתו מתבצעת ההתחברות (SQL Authentication)
- jdbc_password: הסיסמא
- schedule: התזמון של הרצת השאילתה, ב- syntax של cron. בדוגמא שלנו, הרצה כל דקה.
- statement: השליפת SQL עצמה. אם אתם מעדיפים, אפשר להפריד גם לקובץ נפרד ולתת את הנתיב שלו ב- statement_filepath. שימו לב שבשליפה עצמה, בתנאי, אנחנו מבצעים שליפה ומתנים על זה שה- ID גדול מ- sql_last_value. מה זה בדיוק sql_last_value? הערך האחרון שראינו בשליפה של העמודה ID. למה דווקא העמודה ID? כי זה שם העמודה שהעברנו בהגדרה של tracking_column.
- tracking_column: שם העמודה שלפיה נעשה את הבאת הדלתאות בטבלה. כלומר, שם של עמודה שהערך של התא שלה בשורה האחרונה שחוזרת נשמר, ומועבר אלינו בתור פרמטר של sql_last_value לשליפה הבאה. שימו לב ששם העמודה צריך להיות באותיות קטנות (בלי קשר לצורה שהוא מופיע ב- SELECT)
- last_run_metadata_path: הנתיב שבו יישמר ה- state (הערך האחרון של ה- tracking_coolumn
- lowercase_column_names: האם להפוך את כל שמות העמודות ל- lower case. ה- default הוא כן לעשות את זה.
- type: ה- type של ה- data שאנחנו מאנדקסים. השם הזה ישמש אותנו בהמשך הקונפיגורציה כדי לפלטר ולהגיד שחלקים מסויימים חלים רק על דברים מה- type המסוים שאנחנו רוצים, ובנוסף – זה שם ה- type ב- Elasticsearch שיכול לשמש אותנו אח”כ לעדכון Mappings (ה- data types של השדות באינדקס, למשל)
לפני שנעבור ל- output, כמה הערות שכדאי לשים לב אליהן:
- ניתן לעקוב אחרי ערך בודד. כלומר, אתם לא יכולים לחלץ באמצעות ה- jdbc plugin ערכים של מספר עמודות מהשאילתה ולעקוב אחריהם
- אם השליפה שלכם קצת יותר מורכבת, וכוללת למשל הכנסה לטבלאות זמניות או פעולות שמייצרות למעשה הודעה על x rows affected, אבל שלא מצורף לה result sets, אתם עלולים להיתקל ב- exception שאומר statement did not return a resultset, על אף שכן חוזר בסופו של דבר result-set. כדי לפתור את זה, שימו SET NOCOUNT ON בראש ה- query שלכם, במידה שאתם מתעסקים עם יותר מ-statement יחיד של SELECT
ה- output
הגדרנו שימוש בשני output plugins. נתחיל דווקא מהשני, שהוא הכי פשוט – stdout. לצרכי debugging, נוח לראות את המידע נזרק גם ל- stdout.
הפלאגין output העיקרי שבו אנחנו עושים שימוש, הוא ה- elasticsearch. המטרה שלו, כמה לא מפתיע, זה לאפשר לאנדקס את המידע לתוך Elasticsearch. אנחנו מעבירים לו שני פרמטרים:
- hosts: רשימה של hosts שיכולים לשמש לטובת האינדוקס. במקרה שלי, אני עובד על VM שמריץ לי הכל, אז פשוט שמתי 127.0.0.1.
- index: השם של ה- index שלתוכו יישמר המידע. במקרה שלנו, printjobs שלאחריו יש מקף, ואז את התאריך. כלומר, יהיה לנו אינדקס לכל יום שבו הגיע מידע. זה נוח במובן של ניהול retention – קל ככה למחוק מידע היסטורי. כמובן, אפשר לשמור את הכל באינדקס אחד (ואז היינו כותבים רק printjobs, בלי ההמשך), או לעשות את החלוקה לפי ערך אחר. חשוב לשים לב ששם האינדקס חייב להיות מורכב רק מאותיות קטנות. אם הוא יכיל אותיות גדולות, למשל, לא תקבלו שגיאה – אבל מידע פשוט לא יתאנדקס לכם.
דוגמא 2: הוספת של מקור מידע נוסף, עם רשומות שמתעדכנות
הדוגמא הבאה תזכיר מאד את הדוגמא הקודמת – עם שוני מרכזי. הפעם, אנחנו נרצה לתמוך ברשומות שמתעדכנות. כלומר, נרצה שכאשר מתבצע עדכון לרשומה, ה-document המתאים יתעדכן גם באינדקס שלנו ב- Elasticsearch.
בנוסף, הדוגמא הזאת תראה לנו איך מתמודדים כשיש לנו יותר ממקור מידע אחד.
הטבלה שנסתכל עליה הפעם נראית ככה:
CREATE TABLE [dbo].[Users](
[ID] [int] IDENTITY(1,1) NOT NULL,
[Username] [varchar](100) NOT NULL,
[FirstName] [nvarchar](50) NOT NULL,
[LastName] [nvarchar](50) NOT NULL,
[Email] [nvarchar](150) NOT NULL,
[AboutMe] [nvarchar](4000) NOT NULL,
[CreationTime] [datetime2](7) NOT NULL,
[LastUpdated] [datetime2](7) NOT NULL,
CONSTRAINT [PK_Users] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
מה שנסתכל עליו עכשיו זה לא הקונפיגורציה של השליפה מול הטבלה הזאת בלבד, אלא איך נראית קונפיגורציה שמשלבת ביחד את מה שעשינו גם בדוגמא הקודמת. חשוב לציין, שהדרך של להכניס שני input-ים (ויותר) מסוגים שונים לתוך אותה הקונפיגורציה ולהפריד בינהם לפי תנאים על ה- type (מה שאנחנו עומדים לעשות) היא לא הדרך היחידה. ניתן גם לעשות הפרדה ע”י הגדרת מספר pipelines.
אז ככה נראית הקונפיגורציה עכשיו:
input {
jdbc {
jdbc_driver_library => "/tmp/jdbc/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.50.1:1433;databasename=Sample"
jdbc_user => "testuser"
jdbc_password => "testpass"
schedule => "*/1 * * * *"
statement_filepath => "/tmp/printjobs.sql"
tracking_column => "ID"
use_column_value => "true"
last_run_metadata_path => "/tmp/logstash_printer_last_id.state"
lowercase_column_names => false
type => "printHistory"
}
jdbc {
jdbc_driver_library => "/tmp/jdbc/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.50.1:1433;databasename=Sample"
jdbc_user => "testuser"
jdbc_password => "testpass"
schedule => "* * * * *"
tracking_column_type => "timestamp"
statement => "
SELECT ID,
Username,
FirstName,
LastName,
Email,
AboutMe,
CreationTime,
LastUpdated
FROM Users
WHERE LastUpdated > :sql_last_value
"
tracking_column => "LastUpdated"
use_column_value => "true"
last_run_metadata_path => "/tmp/logstash_users_last_timestamp.state"
lowercase_column_names => false
type => "users"
}
}
output {
if [type] == "printHistory" {
elasticsearch {
hosts => ["127.0.0.1"]
index => "printjobs-%{+YYYY.MM.dd}"
}
}
if [type] == "users" {
elasticsearch {
hosts => ["127.0.0.1"]
document_id => "%{ID}"
index => "users"
}
}
stdout { codec => rubydebug }
}
אני רוצה להסב את תשומת לבכם למס’ שינויים ביחס להגדרה של ה- input הקודם:
- למען הבהירות של הקונפיגורציה, אתם רואים שאפשר להעביר את חלק מה-SQL statements להיות בקובץ נפרד (העברתי למשל את ה- jdbc הראשון, שכבר ראינו בדוגמא הקודמת)
- בדוגמא הזאת, אנחנו עוקבים אחרי עמודה שהיא datetime2. לכן, הגדרתי tracking_column_type להיות timestamp (ה- default זה numeric). זה חשוב במיוחד עבור הערך הראשון שמועבר (כשעדיין אין קובץ state). אם זה numeric, אז זה 0. אחרת, זה ה-01/01/1970.
- ה- type של הדוגמא השנייה עכשיו הוא users. זה מאפשר לנו להבדיל גם בהתנייה על ה- type בחלק של ה- output, וגם ב-Elasticsearch, לטובת הגדרת mapping
- ב- output אנחנו רוצים התנהגות שונה עבור דברים שהם רשומות של הדפסה, כמו שהיה לנו קודם, ועבור משתמשים (כמו של ה- input החדש שהוספנו עכשיו). את זה אנחנו משיגים באמצעות הוספת ה- if בתוך הקונפיגורציה. כאמור, היה אפשר להפריד את זה ל- piplines שונים עם קונפיגורציות שונות, אבל זאת דוגמא לאיך עושים את זה באותה הקונפיגורציה (כי גם זה משהו ששימושי להכיר).
- ב- output, עבור דברים שהם מה- type של users, אני מציין document_id. כלומר, במקום לתת לו לייצר document_id חדש, אני מגדיר במפורש מה אמור להיות ה- document_id שלי. בהגדרה הזאת, אני משתמש בערך של אחד השדות שחזרו.
הפעולה שמתבצעת היא פעולת UPSERT. אם לא קיים document עם אותו ה- id, אז נוצר אחד חדש. אם קיים, אז אנחנו מעדכנים. - ב- output, ה- index מוגדר ללא timestamp, אלא כשם קבוע. למשל, במקרה הזה, פחות משמעותי לעשות חלוקה לאינדקסים לפי תאריך היצירה (כי יחסית יש פחות משתמשים מכל תאריך, אנחנו כנראה לא נעשה חיפושי range בד”כ, ובתרחיש הסטנדרטי לא ממש מקובל לעשות rentention ולמחוק יוזרים ישנים…).
- ה-index ב- output חייב להיות שונה מה- index קודם שהגדרנו, ששימש את ה- type הקודם. לפני גרסת Elasticsearch 6, היה ניתן להכניס מספר types שונים לאותו ה- index. החל מ- Elasticsearch 6, זה כבר לא אפשרי. לכן, אנחנו לא יכולים לערבב מספר types באותו האינדקס (ויש גם נימוקים טובים למה לא לעשות את זה בלינק).
- ומה עם מחיקות..? אז עקרונית, אין דרך straight forward לגרום למחיקת ה- documents מ-Elasticsearch כתוצאה מהשינויים בטבלאות. ההמלצה, במקרה הזה, היא להיצמד ל- soft deletes, שהם יופיעו כשינויים ולכן כן יסתנכרנו ל- Elasticsearch.
סיכום
כפי שראינו, ניתן יחסית בקלות לדאוג להזרמת נתונים מ- SQL Server ל- Elasticsearch, תוך שימוש ב- jdbc input plugin של Logstash. זה יכול להיות מאד שימושי אם יש לכם כבר סביבת Elasticsearch שאתם צריכים להזרים אליה נתונים מה- MSSQL שלכם, ויכול להיות שימושי גם אם אתם בוחרים להשתמש ב- Elasticsearch כתחליף ל- Full Text Search.
בהצלחה!