Memakai Stream Di Node.js
Untuk membaca sebuah file, saya dapat menggunakan modul fasilitas yang ditawarkan oleh fs
seperti readFile()
untuk versi asynchronous atau readFileSync()
untuk versi synchronous. Kedua functions tersebut akan membaca seluruh isi file dan menyimpannya ke memori (RAM) sebagai Buffer
. Namun, sama seperti proses lainnya di sistem operasi, aplikasi Node.js juga memiliki batasan alokasi memori. Sebagai contoh, pada sistem saya, ukuran Buffer
maksimum adalah 2 GB. Bila saya membaca file lebih besar dari 2 GB dengan readFile()
atau readFileSync()
, saya akan memperoleh pesan kesalahan seperti Range Error [ERR_FS_FILE_TOO_LARGE]: File size is greater than 2 GB
. Apa yang harus saya lakukan bila tetap ingin memproses file tersebut? Saya bisa menggunakan Stream yang akan memproses file dalam bentuk potongan-potongan kecil. Secara garis besar, walaupun lebih rumit, stream akan lebih efisien terutama dalam memproses file besar.
Node.js mendukung dua jenis implementasi stream: Stream dan Web Streams API. Saat ini status dukungan Web Streams API masih experimental. Web Streams adalah API yang sama seperti yang dipakai oleh programmer front-end di browser. Walaupun demikian, sama seperti di Node.js, Web Streams juga sepertinya belum didukung sempurna di semua browser. Berdasarkan https://caniuse.com/streams, hanya Chrome dan Edge terbaru yang mengimplementasikannya, sementara Firefox masih belum sepenuhnya mendukung spesifikasi tersebut. Oleh sebab itu, pada artikel ini, saya akan fokus pada stream API yang stabil (yang merupakan bawaan Node.js).
Secara garis besar, Node.js memiliki 4 jenis stream: Writable
, Readable
, Duplex
dan Transform
. Sesuai dengan namanya, Writable
dipakai untuk menulis dan Readable
dipakai untuk membaca. Duplex
pada dasarnya adalah sebuah Readable
yang juga Writable
sehingga mendukung operasi baca tulis. Transform
adalah sebuah Duplex
yang mendukung pemrosesan sehingga apa yang ditulis berbeda dengan apa yang dibaca.
Karena ingin membaca file, saya akan mendapatkan sebuah instance Readable
dari fs
, seperti pada contoh berikut ini:
Salah fitur yang sangat berguna dalam menyederhakan kode program adalah fasilitas pipe()
di stream. Dengan pipe()
, saya bisa menggabungkan dua atau lebih file. Sebagai contoh, kode program berikut ini akan menyalin file dengan menggunakan stream:
Kode program di atas sama seperti source.pipe(target)
yang akan mengirim source
stream ke target
stream. Penggunaan pipeline()
memungkinkan saya mendaftarkan callback yang berperan sebagai error handler di setiap stream yang terlibat (sehingga tidak perlu mendaftarkan satu per satu dengan kode seperti stream.on('error', cb)
).
Pada kode program di atas, bagaimana bila saya ingin menampilkan indikator selama proses penyalinan berlangsung? Saya dapat menggunakan sebuah turunan Transform
yang disebut sebagai PassThrough
. Ini mirip seperti tap()
di RxJS: ia tetap akan melewatkan stream asal ke stream tujuan apa adanya, tapi juga memungkinkan saya untuk menambahkan sebuah proses yang tidak melakukan transformasi seperti melakukan logging. Sebagai contoh, saya mengubah kode program menjadi seperti berikut ini:
Pada kode program di atas, stream-nya terlihat seperti pada diagram berikut ini:
Sebuah stream boleh saja memiliki lebih dari satu pipe()
. Ini disebut juga sebagai forking stream. Pada contoh di atas, keluaran dari stream passThrough
akan diarahkan ke target
(file yang salin) dan juga ke sebuah Transform
stream bernama CounterStream
yang akan mengembalikan ukuran yang sudah diproses dalam satuan GB. Hasil dari CounterStream
ini kemudian diarahkan ke process.stdout
yang merupakan sebuah stream yang dipakai untuk menulis ke layar (yang biasanya dilakukan melalui console.log()
).
Bagaimana bila proses yang dilakukan untuk sebuah potongan file oleh stream sangat lambat? Atau, kondisi dimana Readable
bekerja sangat cepat, mengirim banyak data ke Writable
yang lambat dalam mengkonsumsinya? Writable
tersebut tetap akan menyimpan data yang masuk secara sementara. Apa yang terjadi bila data sementara yang belum diproses oleh Writable
semakin menumpuk? Memori akan menjadi penuh dan kesalahan seperti saat memakai readFile()
atau writeFile()
akan timbul kembali! Oleh sebab itu, stream hanya akan menampung data sementara sebanyak nilai yang ditentukan oleh highWaterMark
. Nilai ini default-nya adalah 16 kb dan dapat diatur pada saat stream dibuat. Setelah highWaterMark
tercapai, stream harus berhenti bekerja dan menunggu hingga apa yang tertunda selesai dikerjakan. Proses ini disebut sebagai backpressure. Stream bawaan Node.js sudah mendukung backpressure akan tetapi saat membuat stream sendiri, pembuat stream bertanggung jawab untuk memastikan stream-nya mendukung backpressure.
Untuk contoh yang lebih realistis, saya akan membuat kode program yang membaca daftar alamat IP dari file CSV (kolom ketiga dan kolom kelima) dan menghasilkan file yang berisi daftar hostname yang unik dari file tersebut. File sumber saya sudah dikompres dalam format gzip
sehingga program perlu melakukan gunzip
terlebih dahulu. Kode programnya terlihat seperti berikut ini:
Pipeline stream yang berlangsung akan terlihat seperti berikut ini:
source
adalah sebuah Readable
yang dibuat dengan menggunakan fs.createReadStream()
. Ini adalah bawaan Node.js.
gunzip
adalah sebuah Transform
stream yang melakukan transformasi dari file gzip ke versi yang sudah di-ekstrak. Ini adalah bawaan Node.js.
ExtractIpAddress
adalah sebuah Transform
stream buatan sendiri. Stream ini menerima satu atau lebih baris CSV (yang dipisahkan dengan tanda koma). Transformasi stream tidak harus selalu satu masukan ke satu keluaran (one to one). Sebagai contoh, pada stream ini, untuk setiap baris CSV akan ada dua keluaran: nilai kolom ketiga dan nilai kolom kelima. Ini terlihat dari this.push()
yang dikerjakan dua kali untuk setiap baris.
Distinct
adalah sebuah Transform
stream buatan sendiri. Stream ini hanya akan menulis keluaran bila sebelumnya belum pernah ada sehingga nilai keluarannya tidak mengandung duplikat. Pada stream ini, terlihat bahwa satu masukan bahkan boleh tidak memiliki keluaran sama sekali. Sebagai contoh, bila nilai sudah ada, saya tidak memanggil this.push()
dan langsung memanggil callback
sehingga proses akan lanjut ke nilai masukan berikutnya.
ResolveDNS
adalah sebuah Transform
stream buatan sendiri. Stream ini akan menerjemahkan masukan berupa alamat IP menjadi sebuah nama yang dicari melalui dns.reverse()
. Karena stream ini akan menggunakan jaringan untuk mengakses DNS server, boleh dibilang stream ini adalah yang paling lambat dan menjadi bottleneck.
output
adalah sebuah Writable
yang dibuat dengan menggunakan fs.createWriteStream()
. Ini adalah bawaan Node.js.
Pada setiap stream buatan sendiri, saya memberikan nilai true
untuk objectMode
di constructor. Salah satu dampaknya adalah nilai highWaterMark
kini bukan lagi merujuk pada ukuran melainkan jumlah elemen. Sebagai contoh, nilai 100
pada highWaterMark
berarti buffer untuk 100 elemen, bukan buffer sebesar 100 KB. Selain itu, karena tipe stream buatan sendiri ini adalah Transform
, saya tidak perlu menangani backpressure. Bila ini adalah Readable
atau Writable
, saya perlu memastikan bahwa bila this.push()
mengembalikan nilai false
, stream harus berhenti hingga event drain
muncul.
Saat kode program di atas dijalankan, ia akan menghasilkan file output.txt
. Salah satu karakteristik penggunaan stream adalah saat program masih berjalan, saya bisa melihat hostname yang berhasil di-resolve di output.txt
dengan tail -f output.txt
tanpa harus menunggu seluruh alamat IP selesai diproses.