Compute the local_idx of each Post
This commit is contained in:
parent
2b1940b401
commit
335be14f7a
|
@ -17,6 +17,7 @@ DROP TABLE IF EXISTS threads CASCADE;
|
|||
DROP TABLE IF EXISTS posts CASCADE;
|
||||
DROP TABLE IF EXISTS attachments CASCADE;
|
||||
DROP TYPE IF EXISTS catalog_grid_result CASCADE;
|
||||
DROP TYPE IF EXISTS post_key CASCADE;
|
||||
DROP FUNCTION IF EXISTS update_post_body_search_index;
|
||||
DROP FUNCTION IF EXISTS fetch_top_threads;
|
||||
DROP FUNCTION IF EXISTS fetch_catalog;
|
||||
|
@ -170,6 +171,8 @@ $$ LANGUAGE sql;
|
|||
|
||||
*/
|
||||
|
||||
|
||||
-- Deprecated: this doesn't insert local_idx, seems better to just get all the posts in application
|
||||
CREATE OR REPLACE FUNCTION insert_posts_and_return_ids(new_posts posts[])
|
||||
RETURNS TABLE (post_id bigint, board_post_id bigint, thread_id bigint) AS $$
|
||||
WITH
|
||||
|
@ -199,6 +202,21 @@ $$ LANGUAGE sql;
|
|||
-- 1:21 for full db (nothing inserted)
|
||||
|
||||
|
||||
CREATE TYPE post_key AS
|
||||
( thread_id bigint
|
||||
, board_post_id bigint
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_posts(board_posts post_key[])
|
||||
RETURNS SETOF posts AS $$
|
||||
|
||||
SELECT *
|
||||
FROM posts
|
||||
WHERE (thread_id, board_post_id) IN (SELECT thread_id, board_post_id FROM unnest(board_posts))
|
||||
|
||||
$$ LANGUAGE sql;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION fetch_top_threads(
|
||||
p_start_time TIMESTAMPTZ,
|
||||
lookback INT DEFAULT 10000
|
||||
|
@ -345,6 +363,7 @@ REVOKE EXECUTE ON FUNCTION fetch_top_threads FROM PUBLIC;
|
|||
REVOKE EXECUTE ON FUNCTION fetch_catalog FROM PUBLIC;
|
||||
REVOKE EXECUTE ON FUNCTION search_posts FROM PUBLIC;
|
||||
REVOKE EXECUTE ON FUNCTION update_post_body_search_index FROM PUBLIC;
|
||||
REVOKE EXECUTE ON FUNCTION get_posts FROM PUBLIC;
|
||||
|
||||
CREATE ROLE chan_archive_anon nologin;
|
||||
GRANT CONNECT ON DATABASE chan_archives TO chan_archive_anon;
|
||||
|
@ -356,6 +375,7 @@ GRANT SELECT ON attachments TO chan_archive_anon;
|
|||
GRANT EXECUTE ON FUNCTION fetch_catalog TO chan_archive_anon;
|
||||
GRANT EXECUTE ON FUNCTION fetch_top_threads TO chan_archive_anon;
|
||||
GRANT EXECUTE ON FUNCTION search_posts TO chan_archive_anon;
|
||||
GRANT EXECUTE ON FUNCTION get_posts TO chan_archive_anon;
|
||||
|
||||
-- GRANT usage, select ON SEQUENCE sites_site_id_seq TO chan_archive_anon;
|
||||
-- GRANT usage, select ON SEQUENCE boards_board_id_seq TO chan_archive_anon;
|
||||
|
@ -374,6 +394,7 @@ GRANT EXECUTE ON FUNCTION insert_posts_and_return_ids TO chan_archiver;
|
|||
GRANT EXECUTE ON FUNCTION fetch_top_threads TO chan_archiver;
|
||||
GRANT EXECUTE ON FUNCTION fetch_catalog TO chan_archiver;
|
||||
GRANT EXECUTE ON FUNCTION search_posts TO chan_archiver;
|
||||
GRANT EXECUTE ON FUNCTION get_posts TO chan_archiver;
|
||||
GRANT usage, select ON SEQUENCE sites_site_id_seq TO chan_archiver;
|
||||
GRANT usage, select ON SEQUENCE boards_board_id_seq TO chan_archiver;
|
||||
GRANT usage, select ON SEQUENCE threads_thread_id_seq TO chan_archiver;
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
BEGIN TRANSACTION;
|
||||
|
||||
DROP TYPE IF EXISTS post_key CASCADE;
|
||||
|
||||
-- DROP FUNCTION IF EXISTS get_posts;
|
||||
|
||||
CREATE TYPE post_key AS
|
||||
( thread_id bigint
|
||||
, board_post_id bigint
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_posts(board_posts post_key[])
|
||||
RETURNS SETOF posts AS $$
|
||||
|
||||
SELECT *
|
||||
FROM posts
|
||||
WHERE (thread_id, board_post_id) IN (SELECT thread_id, board_post_id FROM unnest(board_posts))
|
||||
|
||||
$$ LANGUAGE sql;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION get_posts TO chan_archive_anon;
|
||||
GRANT EXECUTE ON FUNCTION get_posts TO chan_archiver;
|
||||
REVOKE EXECUTE ON FUNCTION get_posts FROM PUBLIC;
|
||||
|
||||
COMMIT;
|
103
src/Backfill.hs
103
src/Backfill.hs
|
@ -18,7 +18,8 @@ import System.Directory
|
|||
, createDirectoryIfMissing
|
||||
)
|
||||
import System.FilePath ((</>), (<.>), takeExtension)
|
||||
import Data.List (find, isSuffixOf, foldl')
|
||||
import Data.List (find, isSuffixOf, foldl', sortBy)
|
||||
import Data.Ord (comparing)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Set (Set)
|
||||
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
|
||||
|
@ -200,9 +201,12 @@ readPosts settings board thread = do
|
|||
backupDir = backup_read_root settings </> Boards.pathpart board
|
||||
|
||||
|
||||
apiPostToPostKey :: Threads.Thread -> JSONPosts.Post -> Client.PostId
|
||||
apiPostToPostKey thread post = Client.PostId (Threads.thread_id thread) (JSONPosts.no post)
|
||||
|
||||
-- Convert Post to DbPost
|
||||
apiPostToArchivePost :: Threads.Thread -> JSONPosts.Post -> Posts.Post
|
||||
apiPostToArchivePost thread post =
|
||||
apiPostToArchivePost :: Int -> Threads.Thread -> JSONPosts.Post -> Posts.Post
|
||||
apiPostToArchivePost local_idx thread post =
|
||||
Posts.Post
|
||||
{ Posts.post_id = Nothing
|
||||
, Posts.board_post_id = JSONPosts.no post
|
||||
|
@ -213,6 +217,7 @@ apiPostToArchivePost thread post =
|
|||
, Posts.email = JSONPosts.email post
|
||||
, Posts.thread_id = Threads.thread_id thread
|
||||
, Posts.embed = JSONPosts.embed post
|
||||
, Posts.local_idx = local_idx
|
||||
}
|
||||
|
||||
-- | A version of 'concatMap' that works with a monadic predicate.
|
||||
|
@ -230,18 +235,19 @@ concatMapM op = foldr f (pure [])
|
|||
pure $ x_ ++ xs_
|
||||
|
||||
|
||||
setPostIdInPosts
|
||||
:: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)]
|
||||
-> [ Client.PostId ]
|
||||
addPostsToTuples
|
||||
:: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post)]
|
||||
-> [ Posts.Post ]
|
||||
-> [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)]
|
||||
setPostIdInPosts tuples ids = map f ids
|
||||
addPostsToTuples tuples posts = map f posts
|
||||
where
|
||||
post_map :: Map.Map (Int64, Int64) (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)
|
||||
post_map = Map.fromList (map (\(a, b, c, i, j) -> ((Posts.thread_id j, Posts.board_post_id j), (a, b, c, i, j))) tuples)
|
||||
post_map :: Map.Map (Int64, Int64) (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post)
|
||||
post_map = Map.fromList (map (\(a, b, c, d) -> ((Threads.thread_id c, JSONPosts.no d), (a, b, c, d))) tuples)
|
||||
|
||||
f :: Client.PostId -> (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)
|
||||
f (Client.PostId { Client.post_id = asdf1, Client.thread_id = asdf2, Client.board_post_id = asdf3 }) =
|
||||
(\(a, b, c, i, j) -> (a, b, c, i, j { Posts.post_id = Just asdf1 })) (post_map Map.! (asdf2, asdf3))
|
||||
f :: Posts.Post -> (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)
|
||||
f new_post =
|
||||
(\(a, b, c, d) -> (a, b, c, d, new_post))
|
||||
(post_map Map.! (Posts.thread_id new_post, Posts.board_post_id new_post))
|
||||
|
||||
|
||||
fileToAttachment :: Int -> Posts.Post -> JS.File -> At.Attachment
|
||||
|
@ -520,6 +526,57 @@ processFiles settings tuples = do -- perfect just means that our posts have ids,
|
|||
in Map.insert pid (x : l) accMap
|
||||
|
||||
|
||||
createNewPosts
|
||||
:: JSONSettings
|
||||
-> [ (Threads.Thread, JSONPosts.Post, Client.PostId) ]
|
||||
-> IO [ Posts.Post ]
|
||||
createNewPosts settings tuples = do
|
||||
existing_post_results <- Client.getPosts settings (map (\(_, _, c) -> c) tuples)
|
||||
existing_posts <- either handleError return existing_post_results
|
||||
|
||||
let existing_set :: Set (Int64, Int64) = Set.fromList (map (\x -> (Posts.thread_id x, Posts.board_post_id x)) existing_posts)
|
||||
|
||||
let to_insert_list :: [ (Threads.Thread, JSONPosts.Post, Client.PostId) ] =
|
||||
sortBy (comparing $ \(_, _, p) -> Client.board_post_id p) $
|
||||
determineNew tuples existing_set
|
||||
|
||||
-- Map of thread_id to the largest local_idx value (which would be the number of the last post in the thread)
|
||||
let local_idx :: Map.Map Int64 Int = foldr (Map.unionWith max) Map.empty $
|
||||
map (\x -> Map.singleton (Posts.thread_id x) (Posts.local_idx x)) existing_posts
|
||||
|
||||
let insert_posts = fst $ foldl' foldFn ([], local_idx) to_insert_list
|
||||
|
||||
-- posts to insert are the posts that are not in existing_posts
|
||||
-- so we create a Set (thread_id, board_post_id) ✓
|
||||
-- then check every tuples against the set and the ones not in the set get added to a to_insert_list ✓
|
||||
-- also for every tuples we need to compute a local_idx
|
||||
-- so we create a Map index_map from thread_id to local_idx ✓
|
||||
-- - for existing_posts
|
||||
-- - need to compare posts already in the map with another post and keep the max local_idx ✓
|
||||
-- to get the new local_idx, we must order the to_insert_list by board_post_id, and look up each entry ✓
|
||||
|
||||
posts_result <- Client.postPosts settings insert_posts
|
||||
new_posts <- either handleError return posts_result
|
||||
return $ existing_posts ++ new_posts
|
||||
|
||||
where
|
||||
handleError err = print err >> exitFailure
|
||||
|
||||
determineNew :: [(Threads.Thread, JSONPosts.Post, Client.PostId)] -> Set (Int64, Int64) -> [(Threads.Thread, JSONPosts.Post, Client.PostId)]
|
||||
determineNew ts existing_set = filter (\(_, _, c) -> Set.notMember (Client.thread_id c, Client.board_post_id c) existing_set) ts
|
||||
|
||||
foldFn :: ([Posts.Post], Map.Map Int64 Int) -> (Threads.Thread, JSONPosts.Post, Client.PostId) -> ([Posts.Post], Map.Map Int64 Int)
|
||||
foldFn (posts, idx_map) (t, p, c) =
|
||||
case Map.lookup thread_id idx_map of
|
||||
Nothing -> ((post 1) : posts, Map.insert thread_id 1 idx_map)
|
||||
Just i -> ((post (i + 1)) : posts, Map.insert thread_id (i + 1) idx_map)
|
||||
|
||||
where
|
||||
post :: Int -> Posts.Post
|
||||
post i = apiPostToArchivePost i t p
|
||||
|
||||
thread_id = Client.thread_id c
|
||||
|
||||
processBoard :: JSONSettings -> Sites.Site -> Boards.Board -> IO ()
|
||||
processBoard settings site board = do
|
||||
let catalogPath = backupDir </> "catalog.json"
|
||||
|
@ -535,23 +592,21 @@ processBoard settings site board = do
|
|||
|
||||
all_posts_on_board :: [(Threads.Thread, [ JSONPosts.Post ])] <- mapM (readPosts settings board) all_threads_for_board
|
||||
|
||||
let tuples :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)] = concatMap
|
||||
(\(t, posts) -> map (\p -> (site, board, t, p, apiPostToArchivePost t p)) posts)
|
||||
|
||||
let tuples :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post)] = concatMap
|
||||
(\(t, posts) -> map (\p -> (site, board, t, p)) posts)
|
||||
all_posts_on_board
|
||||
|
||||
posts_result <- Client.postPosts settings (map (\(_, _, _, _, d) -> d) tuples)
|
||||
posts_result :: [ Posts.Post ] <- createNewPosts settings (map (\(_, _, c, d) -> (c, d, apiPostToPostKey c d)) tuples)
|
||||
|
||||
case posts_result of
|
||||
Left err -> print err
|
||||
Right (new_ids :: [ Client.PostId ]) -> do
|
||||
putStrLn "Sum of post_ids:"
|
||||
print $ sum $ map Client.post_id new_ids
|
||||
putStrLn "Sum of board_post_ids:"
|
||||
print $ sum $ map Client.board_post_id new_ids
|
||||
putStrLn "Sum of post_ids:"
|
||||
print $ sum $ map (fromJust . Posts.post_id) posts_result
|
||||
putStrLn "Sum of board_post_ids:"
|
||||
print $ sum $ map Posts.board_post_id posts_result
|
||||
|
||||
let perfect_post_pairs = setPostIdInPosts tuples new_ids
|
||||
let perfect_post_pairs = addPostsToTuples tuples posts_result
|
||||
|
||||
processFiles settings perfect_post_pairs
|
||||
processFiles settings perfect_post_pairs
|
||||
|
||||
Left errMsg ->
|
||||
putStrLn $ "Failed to parse the JSON file in directory: "
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 1b08b3ac079c6032e1090e79a8074d4cfd9ef24f
|
||||
Subproject commit aed106f40e24125d9057d3a9dda934a41c6d68a0
|
|
@ -12,6 +12,7 @@ module DataClient
|
|||
, postBoards
|
||||
, getThreads
|
||||
, postThreads
|
||||
, getPosts
|
||||
, postPosts
|
||||
, getAttachments
|
||||
, postAttachments
|
||||
|
@ -26,6 +27,7 @@ import qualified Data.ByteString.Lazy.Char8 as LC8
|
|||
import Data.List (intercalate)
|
||||
import Data.Aeson
|
||||
( eitherDecode
|
||||
, ToJSON
|
||||
, FromJSON
|
||||
, (.=)
|
||||
, object
|
||||
|
@ -42,11 +44,12 @@ import qualified Common.AttachmentType as Attachments
|
|||
import qualified Common.PostsType as Posts
|
||||
import Common.Network.HttpClient
|
||||
|
||||
|
||||
data PostId = PostId
|
||||
{ post_id :: Int64
|
||||
, board_post_id :: Int64
|
||||
, thread_id :: Int64
|
||||
} deriving (Show, Generic, FromJSON)
|
||||
{ board_post_id :: Int64
|
||||
, thread_id :: Int64
|
||||
} deriving (Show, Generic, ToJSON)
|
||||
|
||||
|
||||
getSiteBoards :: T.JSONSettings -> Int -> IO (Either HttpError [ Boards.Board ])
|
||||
getSiteBoards settings site_id_ = get settings path >>= return . eitherDecodeResponse
|
||||
|
@ -103,6 +106,7 @@ postThreads settings threads =
|
|||
getAllSites :: T.JSONSettings -> IO (Either HttpError [ Sites.Site ])
|
||||
getAllSites settings = get settings "/sites" >>= return . eitherDecodeResponse
|
||||
|
||||
|
||||
getThreads :: T.JSONSettings -> Int -> [ Int ] -> IO (Either HttpError [ Threads.Thread ])
|
||||
getThreads settings board_id board_thread_ids =
|
||||
get settings path >>= return . eitherDecodeResponse
|
||||
|
@ -155,15 +159,39 @@ postAttachments settings attachments = do
|
|||
payload = encode attachments
|
||||
|
||||
|
||||
-- | Function to handle each chunk.
|
||||
getPostsChunk :: T.JSONSettings -> [ PostId ] -> IO (Either HttpError [Posts.Post])
|
||||
getPostsChunk settings chunk =
|
||||
post settings "/rpc/get_posts" payload False >>= return . eitherDecodeResponse
|
||||
|
||||
where
|
||||
payload = encode $ object [ "board_posts" .= chunk ]
|
||||
|
||||
|
||||
getPosts :: T.JSONSettings -> [ PostId ] -> IO (Either HttpError [Posts.Post])
|
||||
getPosts settings xs = do
|
||||
results <- forM (chunkList chunkSize xs) (getPostsChunk settings)
|
||||
return $ combineResults results
|
||||
where
|
||||
chunkSize = 1000
|
||||
|
||||
|
||||
postPosts
|
||||
:: T.JSONSettings
|
||||
-> [ Posts.Post ]
|
||||
-> IO (Either HttpError [ PostId ])
|
||||
postPosts settings posts =
|
||||
post settings "/rpc/insert_posts_and_return_ids" payload True >>= return . eitherDecodeResponse
|
||||
-> IO (Either HttpError [ Posts.Post ])
|
||||
postPosts settings posts = do
|
||||
post settings "/posts" payload True >>= return . eitherDecodeResponse
|
||||
|
||||
where
|
||||
payload = encode $ object [ "new_posts" .= posts ]
|
||||
payload = encode posts
|
||||
|
||||
|
||||
-- Old type:
|
||||
-- postPosts
|
||||
-- :: T.JSONSettings
|
||||
-- -> [ Posts.Post ]
|
||||
-- -> IO (Either HttpError [ PostId ])
|
||||
|
||||
|
||||
eitherDecodeResponse :: (FromJSON a) => Either HttpError LBS.ByteString -> Either HttpError a
|
||||
|
|
Loading…
Reference in New Issue